Skip to content

Commit 2d55149

Browse files
feat: Return all bulk dataset processing errors (#25)
1 parent 4f8051a commit 2d55149

File tree

4 files changed

+592
-154
lines changed

4 files changed

+592
-154
lines changed

pre-compute/src/compute/app_runner.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,35 +42,33 @@ pub fn start_with_app<A: PreComputeAppTrait>(
4242
pre_compute_app: &mut A,
4343
chain_task_id: &str,
4444
) -> ExitMode {
45-
let exit_cause = match pre_compute_app.run() {
45+
let exit_causes = match pre_compute_app.run() {
4646
Ok(_) => {
4747
info!("TEE pre-compute completed");
4848
return ExitMode::Success;
4949
}
50-
Err(exit_cause) => {
51-
error!("TEE pre-compute failed with known exit cause [{exit_cause:?}]");
52-
exit_cause
50+
Err(exit_causes) => {
51+
error!("TEE pre-compute failed with known exit cause [{exit_causes:?}]");
52+
exit_causes
5353
}
5454
};
5555

5656
let authorization = match get_challenge(chain_task_id) {
5757
Ok(auth) => auth,
5858
Err(_) => {
59-
error!("Failed to sign exitCause message [{exit_cause:?}]");
59+
error!("Failed to sign exitCause message [{exit_causes:?}]");
6060
return ExitMode::UnreportedFailure;
6161
}
6262
};
6363

64-
let exit_causes = vec![exit_cause.clone()];
65-
6664
match WorkerApiClient::from_env().send_exit_causes_for_pre_compute_stage(
6765
&authorization,
6866
chain_task_id,
6967
&exit_causes,
7068
) {
7169
Ok(_) => ExitMode::ReportedFailure,
7270
Err(_) => {
73-
error!("Failed to report exitCause [{exit_cause:?}]");
71+
error!("Failed to report exitCause [{exit_causes:?}]");
7472
ExitMode::UnreportedFailure
7573
}
7674
}
@@ -150,7 +148,7 @@ mod pre_compute_start_with_app_tests {
150148

151149
let mut mock = MockPreComputeAppTrait::new();
152150
mock.expect_run()
153-
.returning(|| Err(ReplicateStatusCause::PreComputeWorkerAddressMissing));
151+
.returning(|| Err(vec![ReplicateStatusCause::PreComputeWorkerAddressMissing]));
154152

155153
temp_env::with_vars(env_vars_to_set, || {
156154
temp_env::with_vars_unset(env_vars_to_unset, || {
@@ -172,8 +170,11 @@ mod pre_compute_start_with_app_tests {
172170
let env_vars_to_unset = vec![ENV_SIGN_TEE_CHALLENGE_PRIVATE_KEY];
173171

174172
let mut mock = MockPreComputeAppTrait::new();
175-
mock.expect_run()
176-
.returning(|| Err(ReplicateStatusCause::PreComputeTeeChallengePrivateKeyMissing));
173+
mock.expect_run().returning(|| {
174+
Err(vec![
175+
ReplicateStatusCause::PreComputeTeeChallengePrivateKeyMissing,
176+
])
177+
});
177178

178179
temp_env::with_vars(env_vars_to_set, || {
179180
temp_env::with_vars_unset(env_vars_to_unset, || {
@@ -199,8 +200,11 @@ mod pre_compute_start_with_app_tests {
199200
let mock_server_addr_string = mock_server.address().to_string();
200201

201202
let mut mock = MockPreComputeAppTrait::new();
202-
mock.expect_run()
203-
.returning(|| Err(ReplicateStatusCause::PreComputeTeeChallengePrivateKeyMissing));
203+
mock.expect_run().returning(|| {
204+
Err(vec![
205+
ReplicateStatusCause::PreComputeTeeChallengePrivateKeyMissing,
206+
])
207+
});
204208

205209
let result_code = tokio::task::spawn_blocking(move || {
206210
let env_vars = vec![
@@ -248,7 +252,7 @@ mod pre_compute_start_with_app_tests {
248252
let mut mock = MockPreComputeAppTrait::new();
249253
mock.expect_run()
250254
.times(1)
251-
.returning(|| Err(ReplicateStatusCause::PreComputeOutputFolderNotFound));
255+
.returning(|| Err(vec![ReplicateStatusCause::PreComputeOutputFolderNotFound]));
252256

253257
// Move the blocking operations into spawn_blocking
254258
let result_code = tokio::task::spawn_blocking(move || {

pre-compute/src/compute/errors.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ pub enum ReplicateStatusCause {
2727
PreComputeInvalidTeeSignature,
2828
#[error("IS_DATASET_REQUIRED environment variable is missing")]
2929
PreComputeIsDatasetRequiredMissing,
30-
#[error("Input files download failed")]
31-
PreComputeInputFileDownloadFailed,
30+
#[error("Input file download failed for input {0}")]
31+
PreComputeInputFileDownloadFailed(String),
3232
#[error("Input files number related environment variable is missing")]
3333
PreComputeInputFilesNumberMissing,
3434
#[error("Invalid dataset checksum for dataset {0}")]

pre-compute/src/compute/pre_compute_app.rs

Lines changed: 81 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::compute::errors::ReplicateStatusCause;
22
use crate::compute::pre_compute_args::PreComputeArgs;
3+
use crate::compute::utils::env_utils::{TeeSessionEnvironmentVariable, get_env_var_or_error};
34
use crate::compute::utils::file_utils::{download_file, write_file};
45
use crate::compute::utils::hash_utils::sha256;
56
use log::{error, info};
@@ -9,9 +10,9 @@ use std::path::{Path, PathBuf};
910

1011
#[cfg_attr(test, automock)]
1112
pub trait PreComputeAppTrait {
12-
fn run(&mut self) -> Result<(), ReplicateStatusCause>;
13+
fn run(&mut self) -> Result<(), Vec<ReplicateStatusCause>>;
1314
fn check_output_folder(&self) -> Result<(), ReplicateStatusCause>;
14-
fn download_input_files(&self) -> Result<(), ReplicateStatusCause>;
15+
fn download_input_files(&self) -> Result<(), Vec<ReplicateStatusCause>>;
1516
fn save_plain_dataset_file(
1617
&self,
1718
plain_content: &[u8],
@@ -37,15 +38,19 @@ impl PreComputeAppTrait for PreComputeApp {
3738
/// Runs the complete pre-compute pipeline.
3839
///
3940
/// This method orchestrates the entire pre-compute process:
40-
/// 1. Reads configuration arguments
41-
/// 2. Validates the output folder exists
42-
/// 3. Downloads and decrypts the dataset (if required)
43-
/// 4. Downloads all input files
41+
/// 1. Reads the output directory from environment variable `IEXEC_PRE_COMPUTE_OUT`
42+
/// 2. Reads and validates configuration arguments from environment variables
43+
/// 3. Validates the output folder exists
44+
/// 4. Downloads and decrypts all datasets (if required)
45+
/// 5. Downloads all input files
46+
///
47+
/// The method collects all errors encountered during execution and returns them together,
48+
/// allowing partial completion when possible (e.g., if one dataset fails, others are still processed).
4449
///
4550
/// # Returns
4651
///
4752
/// - `Ok(())` if all operations completed successfully
48-
/// - `Err(ReplicateStatusCause)` if any step failed
53+
/// - `Err(Vec<ReplicateStatusCause>)` containing all errors encountered during execution
4954
///
5055
/// # Example
5156
///
@@ -55,17 +60,46 @@ impl PreComputeAppTrait for PreComputeApp {
5560
/// let mut app = PreComputeApp::new("task_id".to_string());
5661
/// app.run();
5762
/// ```
58-
fn run(&mut self) -> Result<(), ReplicateStatusCause> {
59-
// TODO: Collect all errors instead of propagating immediately, and return the list of errors
60-
self.pre_compute_args = PreComputeArgs::read_args()?;
61-
self.check_output_folder()?;
63+
fn run(&mut self) -> Result<(), Vec<ReplicateStatusCause>> {
64+
let (mut args, mut exit_causes): (PreComputeArgs, Vec<ReplicateStatusCause>);
65+
match get_env_var_or_error(
66+
TeeSessionEnvironmentVariable::IexecPreComputeOut,
67+
ReplicateStatusCause::PreComputeOutputPathMissing,
68+
) {
69+
Ok(output_dir) => {
70+
(args, exit_causes) = PreComputeArgs::read_args();
71+
args.output_dir = output_dir;
72+
}
73+
Err(e) => {
74+
error!("Failed to read output directory: {e:?}");
75+
return Err(vec![e]);
76+
}
77+
};
78+
self.pre_compute_args = args;
79+
80+
if let Err(exit_cause) = self.check_output_folder() {
81+
return Err(vec![exit_cause]);
82+
}
83+
6284
for dataset in self.pre_compute_args.datasets.iter() {
63-
let encrypted_content = dataset.download_encrypted_dataset(&self.chain_task_id)?;
64-
let plain_content = dataset.decrypt_dataset(&encrypted_content)?;
65-
self.save_plain_dataset_file(&plain_content, &dataset.filename)?;
85+
if let Err(exit_cause) = dataset
86+
.download_encrypted_dataset(&self.chain_task_id)
87+
.and_then(|encrypted_content| dataset.decrypt_dataset(&encrypted_content))
88+
.and_then(|plain_content| {
89+
self.save_plain_dataset_file(&plain_content, &dataset.filename)
90+
})
91+
{
92+
exit_causes.push(exit_cause);
93+
};
94+
}
95+
if let Err(exit_cause) = self.download_input_files() {
96+
exit_causes.extend(exit_cause);
97+
};
98+
if !exit_causes.is_empty() {
99+
Err(exit_causes)
100+
} else {
101+
Ok(())
66102
}
67-
self.download_input_files()?;
68-
Ok(())
69103
}
70104

71105
/// Checks whether the output folder specified in `pre_compute_args` exists.
@@ -93,31 +127,40 @@ impl PreComputeAppTrait for PreComputeApp {
93127
/// Downloads the input files listed in `pre_compute_args.input_files` to the specified `output_dir`.
94128
///
95129
/// Each URL is hashed (SHA-256) to generate a unique local filename.
96-
/// If any download fails, the function returns an error.
130+
/// The method continues downloading all files even if some downloads fail.
97131
///
98-
/// # Returns
132+
/// # Behavior
99133
///
100-
/// - `Ok(())` if all files are downloaded successfully.
101-
/// - `Err(ReplicateStatusCause::PreComputeInputFileDownloadFailed)` if any file fails to download.
134+
/// - Downloads continue even when individual files fail
135+
/// - Successfully downloaded files are saved with SHA-256 hashed filenames
136+
/// - All download failures are collected and returned together
102137
///
103-
/// # Panics
138+
/// # Returns
104139
///
105-
/// This function panics if:
106-
/// - `pre_compute_args` is `None`.
107-
/// - `chain_task_id` is `None`.
108-
fn download_input_files(&self) -> Result<(), ReplicateStatusCause> {
140+
/// - `Ok(())` if all files are downloaded successfully
141+
/// - `Err(Vec<ReplicateStatusCause>)` containing a `PreComputeInputFileDownloadFailed` error
142+
/// for each file that failed to download
143+
fn download_input_files(&self) -> Result<(), Vec<ReplicateStatusCause>> {
144+
let mut exit_causes: Vec<ReplicateStatusCause> = Vec::new();
109145
let args = &self.pre_compute_args;
110146
let chain_task_id: &str = &self.chain_task_id;
111147

112-
for url in &args.input_files {
148+
for url in args.input_files.iter() {
113149
info!("Downloading input file [chainTaskId:{chain_task_id}, url:{url}]");
114150

115151
let filename = sha256(url.to_string());
116152
if download_file(url, &args.output_dir, &filename).is_none() {
117-
return Err(ReplicateStatusCause::PreComputeInputFileDownloadFailed);
153+
exit_causes.push(ReplicateStatusCause::PreComputeInputFileDownloadFailed(
154+
url.to_string(),
155+
));
118156
}
119157
}
120-
Ok(())
158+
159+
if !exit_causes.is_empty() {
160+
Err(exit_causes)
161+
} else {
162+
Ok(())
163+
}
121164
}
122165

123166
/// Saves the decrypted (plain) dataset to disk in the configured output directory.
@@ -293,12 +336,14 @@ mod tests {
293336
let result = app.download_input_files();
294337
assert_eq!(
295338
result.unwrap_err(),
296-
ReplicateStatusCause::PreComputeInputFileDownloadFailed
339+
vec![ReplicateStatusCause::PreComputeInputFileDownloadFailed(
340+
"https://invalid-url-that-should-fail.com/file.txt".to_string()
341+
)]
297342
);
298343
}
299344

300345
#[test]
301-
fn test_partial_failure_stops_on_first_error() {
346+
fn test_partial_failure_dont_stops_on_first_error() {
302347
let (_container, json_url, xml_url) = start_container();
303348

304349
let temp_dir = TempDir::new().unwrap();
@@ -307,24 +352,26 @@ mod tests {
307352
vec![
308353
&json_url, // This should succeed
309354
"https://invalid-url-that-should-fail.com/file.txt", // This should fail
310-
&xml_url, // This shouldn't be reached
355+
&xml_url, // This should succeed
311356
],
312357
temp_dir.path().to_str().unwrap(),
313358
);
314359

315360
let result = app.download_input_files();
316361
assert_eq!(
317362
result.unwrap_err(),
318-
ReplicateStatusCause::PreComputeInputFileDownloadFailed
363+
vec![ReplicateStatusCause::PreComputeInputFileDownloadFailed(
364+
"https://invalid-url-that-should-fail.com/file.txt".to_string()
365+
)]
319366
);
320367

321368
// First file should be downloaded with SHA256 filename
322369
let json_hash = sha256(json_url);
323370
assert!(temp_dir.path().join(json_hash).exists());
324371

325-
// Third file should NOT be downloaded (stopped on second failure)
372+
// Third file should be downloaded (not stopped on second failure)
326373
let xml_hash = sha256(xml_url);
327-
assert!(!temp_dir.path().join(xml_hash).exists());
374+
assert!(temp_dir.path().join(xml_hash).exists());
328375
}
329376
// endregion
330377

0 commit comments

Comments
 (0)