Skip to content

Commit c4ce1e1

Browse files
Cluster error handling (#621)
* cluster error handling * fix error read * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add additional tests * extend error handling * folder error * fix testing with flux * another test * hit cache again * in interactive mode only successful calculation are stored * clean up --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 8f1026f commit c4ce1e1

File tree

9 files changed

+137
-24
lines changed

9 files changed

+137
-24
lines changed

executorlib/backend/cache_parallel.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,24 @@ def main() -> None:
3939
apply_dict = backend_load_file(file_name=file_name)
4040
apply_dict = MPI.COMM_WORLD.bcast(apply_dict, root=0)
4141
output = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
42-
result = MPI.COMM_WORLD.gather(output, root=0) if mpi_size_larger_one else output
43-
if mpi_rank_zero:
44-
backend_write_file(
45-
file_name=file_name,
46-
output=result,
47-
runtime=time.time() - time_start,
42+
try:
43+
result = (
44+
MPI.COMM_WORLD.gather(output, root=0) if mpi_size_larger_one else output
4845
)
46+
except Exception as error:
47+
if mpi_rank_zero:
48+
backend_write_file(
49+
file_name=file_name,
50+
output={"error": error},
51+
runtime=time.time() - time_start,
52+
)
53+
else:
54+
if mpi_rank_zero:
55+
backend_write_file(
56+
file_name=file_name,
57+
output={"result": result},
58+
runtime=time.time() - time_start,
59+
)
4960
MPI.COMM_WORLD.Barrier()
5061

5162

executorlib/cache/backend.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,16 @@ def backend_write_file(file_name: str, output: Any, runtime: float) -> None:
4444
"""
4545
file_name_out = os.path.splitext(file_name)[0]
4646
os.rename(file_name, file_name_out + ".h5ready")
47-
dump(
48-
file_name=file_name_out + ".h5ready",
49-
data_dict={"output": output, "runtime": runtime},
50-
)
47+
if "result" in output:
48+
dump(
49+
file_name=file_name_out + ".h5ready",
50+
data_dict={"output": output["result"], "runtime": runtime},
51+
)
52+
else:
53+
dump(
54+
file_name=file_name_out + ".h5ready",
55+
data_dict={"error": output["error"], "runtime": runtime},
56+
)
5157
os.rename(file_name_out + ".h5ready", file_name_out + ".h5out")
5258

5359

@@ -63,7 +69,15 @@ def backend_execute_task_in_file(file_name: str) -> None:
6369
"""
6470
apply_dict = backend_load_file(file_name=file_name)
6571
time_start = time.time()
66-
result = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
72+
try:
73+
result = {
74+
"result": apply_dict["fn"].__call__(
75+
*apply_dict["args"], **apply_dict["kwargs"]
76+
)
77+
}
78+
except Exception as error:
79+
result = {"error": error}
80+
6781
backend_write_file(
6882
file_name=file_name,
6983
output=result,

executorlib/cache/shared.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@ def result(self) -> Any:
3030
str: The result of the future item.
3131
3232
"""
33-
exec_flag, result = get_output(file_name=self._file_name)
34-
if exec_flag:
33+
exec_flag, no_error_flag, result = get_output(file_name=self._file_name)
34+
if exec_flag and no_error_flag:
3535
return result
36+
elif exec_flag:
37+
raise result
3638
else:
3739
return self.result()
3840

@@ -198,9 +200,11 @@ def _check_task_output(
198200
file_name = os.path.join(cache_directory, task_key, "cache.h5out")
199201
if not os.path.exists(file_name):
200202
return future_obj
201-
exec_flag, result = get_output(file_name=file_name)
202-
if exec_flag:
203+
exec_flag, no_error_flag, result = get_output(file_name=file_name)
204+
if exec_flag and no_error_flag:
203205
future_obj.set_result(result)
206+
elif exec_flag:
207+
future_obj.set_exception(result)
204208
return future_obj
205209

206210

executorlib/interactive/shared.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ def _execute_task_with_cache(
171171
else:
172172
_task_done(future_queue=future_queue)
173173
else:
174-
_, result = get_output(file_name=file_name)
174+
_, _, result = get_output(file_name=file_name)
175175
future = task_dict["future"]
176176
future.set_result(result)
177177
_task_done(future_queue=future_queue)

executorlib/standalone/hdf.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
"args": "input_args",
1111
"kwargs": "input_kwargs",
1212
"output": "output",
13+
"error": "error",
1314
"runtime": "runtime",
1415
"queue_id": "queue_id",
1516
}
@@ -60,21 +61,23 @@ def load(file_name: str) -> dict:
6061
return data_dict
6162

6263

63-
def get_output(file_name: str) -> tuple[bool, Any]:
64+
def get_output(file_name: str) -> tuple[bool, bool, Any]:
6465
"""
6566
Check if output is available in the HDF5 file
6667
6768
Args:
6869
file_name (str): file name of the HDF5 file as absolute path
6970
7071
Returns:
71-
Tuple[bool, object]: boolean flag indicating if output is available and the output object itself
72+
Tuple[bool, bool, object]: boolean flag indicating if output is available and the output object itself
7273
"""
7374
with h5py.File(file_name, "r") as hdf:
7475
if "output" in hdf:
75-
return True, cloudpickle.loads(np.void(hdf["/output"]))
76+
return True, True, cloudpickle.loads(np.void(hdf["/output"]))
77+
elif "error" in hdf:
78+
return True, False, cloudpickle.loads(np.void(hdf["/error"]))
7679
else:
77-
return False, None
80+
return False, False, None
7881

7982

8083
def get_runtime(file_name: str) -> float:

tests/test_cache_backend_execute.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ def my_funct(a, b):
1919
return a + b
2020

2121

22+
def get_error(a):
23+
raise ValueError(a)
24+
25+
2226
@unittest.skipIf(
2327
skip_h5io_test, "h5io is not installed, so the h5io tests are skipped."
2428
)
@@ -107,6 +111,36 @@ def test_execute_function_kwargs(self):
107111
self.assertTrue(future_file_obj.done())
108112
self.assertEqual(future_file_obj.result(), 3)
109113

114+
def test_execute_function_error(self):
115+
cache_directory = os.path.abspath("cache")
116+
os.makedirs(cache_directory, exist_ok=True)
117+
task_key, data_dict = serialize_funct_h5(
118+
fn=get_error,
119+
fn_args=[],
120+
fn_kwargs={"a": 1},
121+
)
122+
file_name = os.path.join(cache_directory, task_key, "cache.h5in")
123+
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
124+
dump(file_name=file_name, data_dict=data_dict)
125+
backend_execute_task_in_file(file_name=file_name)
126+
future_obj = Future()
127+
_check_task_output(
128+
task_key=task_key, future_obj=future_obj, cache_directory=cache_directory
129+
)
130+
self.assertTrue(future_obj.done())
131+
with self.assertRaises(ValueError):
132+
future_obj.result()
133+
self.assertTrue(
134+
get_runtime(file_name=os.path.join(cache_directory, task_key, "cache.h5out"))
135+
> 0.0
136+
)
137+
future_file_obj = FutureItem(
138+
file_name=os.path.join(cache_directory, task_key, "cache.h5out")
139+
)
140+
self.assertTrue(future_file_obj.done())
141+
with self.assertRaises(ValueError):
142+
future_file_obj.result()
143+
110144
def tearDown(self):
111145
if os.path.exists("cache"):
112146
shutil.rmtree("cache")

tests/test_cache_fileexecutor_serial.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ def list_files_in_working_directory():
2727
return os.listdir(os.getcwd())
2828

2929

30+
def get_error(a):
31+
raise ValueError(a)
32+
33+
3034
@unittest.skipIf(
3135
skip_h5py_test, "h5py is not installed, so the h5py tests are skipped."
3236
)
@@ -68,6 +72,15 @@ def test_executor_working_directory(self):
6872
fs1 = exe.submit(list_files_in_working_directory)
6973
self.assertEqual(fs1.result(), os.listdir(cwd))
7074

75+
def test_executor_error(self):
76+
cwd = os.path.join(os.path.dirname(__file__), "executables")
77+
with FileExecutor(
78+
resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess
79+
) as exe:
80+
fs1 = exe.submit(get_error, a=1)
81+
with self.assertRaises(ValueError):
82+
fs1.result()
83+
7184
def test_executor_function(self):
7285
fs1 = Future()
7386
q = Queue()

tests/test_singlenodeexecutor_cache.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import unittest
44

55
from executorlib import SingleNodeExecutor
6+
from executorlib.standalone.serialize import cloudpickle_register
67

78
try:
89
from executorlib.standalone.hdf import get_cache_data
@@ -12,6 +13,10 @@
1213
skip_h5py_test = True
1314

1415

16+
def get_error(a):
17+
raise ValueError(a)
18+
19+
1520
@unittest.skipIf(
1621
skip_h5py_test, "h5py is not installed, so the h5io tests are skipped."
1722
)
@@ -28,6 +33,16 @@ def test_cache_data(self):
2833
sum([sum(c["input_args"][0]) for c in cache_lst]), sum(result_lst)
2934
)
3035

36+
def test_cache_error(self):
37+
cache_directory = "./cache_error"
38+
with SingleNodeExecutor(cache_directory=cache_directory) as exe:
39+
cloudpickle_register(ind=1)
40+
f = exe.submit(get_error, a=1)
41+
with self.assertRaises(ValueError):
42+
print(f.result())
43+
3144
def tearDown(self):
3245
if os.path.exists("cache"):
3346
shutil.rmtree("cache")
47+
if os.path.exists("cache_error"):
48+
shutil.rmtree("cache_error")

tests/test_standalone_hdf.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ def test_hdf_mixed(self):
3939
self.assertTrue("fn" in data_dict.keys())
4040
self.assertEqual(data_dict["args"], [a])
4141
self.assertEqual(data_dict["kwargs"], {"b": b})
42-
flag, output = get_output(file_name=file_name)
42+
flag, no_error, output = get_output(file_name=file_name)
4343
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
44+
self.assertFalse(no_error)
4445
self.assertFalse(flag)
4546
self.assertIsNone(output)
4647

@@ -55,9 +56,10 @@ def test_hdf_args(self):
5556
self.assertTrue("fn" in data_dict.keys())
5657
self.assertEqual(data_dict["args"], [a, b])
5758
self.assertEqual(data_dict["kwargs"], {})
58-
flag, output = get_output(file_name=file_name)
59+
flag, no_error, output = get_output(file_name=file_name)
5960
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
6061
self.assertFalse(flag)
62+
self.assertFalse(no_error)
6163
self.assertIsNone(output)
6264

6365
def test_hdf_kwargs(self):
@@ -80,9 +82,10 @@ def test_hdf_kwargs(self):
8082
self.assertEqual(data_dict["args"], ())
8183
self.assertEqual(data_dict["kwargs"], {"a": a, "b": b})
8284
self.assertEqual(get_queue_id(file_name=file_name), 123)
83-
flag, output = get_output(file_name=file_name)
85+
flag, no_error, output = get_output(file_name=file_name)
8486
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
8587
self.assertFalse(flag)
88+
self.assertFalse(no_error)
8689
self.assertIsNone(output)
8790

8891
def test_hdf_queue_id(self):
@@ -95,11 +98,27 @@ def test_hdf_queue_id(self):
9598
data_dict={"queue_id": queue_id},
9699
)
97100
self.assertEqual(get_queue_id(file_name=file_name), 123)
98-
flag, output = get_output(file_name=file_name)
101+
flag, no_error, output = get_output(file_name=file_name)
99102
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
100103
self.assertFalse(flag)
104+
self.assertFalse(no_error)
101105
self.assertIsNone(output)
102106

107+
def test_hdf_error(self):
108+
cache_directory = os.path.abspath("cache")
109+
os.makedirs(cache_directory, exist_ok=True)
110+
file_name = os.path.join(cache_directory, "test_error.h5")
111+
error = ValueError()
112+
dump(
113+
file_name=file_name,
114+
data_dict={"error": error},
115+
)
116+
flag, no_error, output = get_output(file_name=file_name)
117+
self.assertTrue(get_runtime(file_name=file_name) == 0.0)
118+
self.assertTrue(flag)
119+
self.assertFalse(no_error)
120+
self.assertTrue(isinstance(output, error.__class__))
121+
103122
def tearDown(self):
104123
if os.path.exists("cache"):
105124
shutil.rmtree("cache")

0 commit comments

Comments
 (0)