Skip to content

Commit 61e1d7c

Browse files
authored
Debug error handling (#577)
* Debug error handling * skip tests with lower python versions * Update shared.py * Update shared.py * fix tests * Update shared.py * Update shared.py * Update test_dependencies_executor.py
1 parent b1145f3 commit 61e1d7c

File tree

2 files changed

+206
-7
lines changed

2 files changed

+206
-7
lines changed

executorlib/interactive/shared.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import queue
44
import sys
55
import time
6+
from asyncio.exceptions import CancelledError
67
from concurrent.futures import Future
78
from time import sleep
89
from typing import Any, Callable, Optional, Union
@@ -361,7 +362,10 @@ def execute_tasks_with_dependencies(
361362
task_dict is not None and "fn" in task_dict and "future" in task_dict
362363
):
363364
future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict)
364-
if len(future_lst) == 0 or ready_flag:
365+
exception_lst = _get_exception_lst(future_lst=future_lst)
366+
if len(exception_lst) > 0:
367+
task_dict["future"].set_exception(exception_lst[0])
368+
elif len(future_lst) == 0 or ready_flag:
365369
# No future objects are used in the input or all future objects are already done
366370
task_dict["args"], task_dict["kwargs"] = _update_futures_in_input(
367371
args=task_dict["args"], kwargs=task_dict["kwargs"]
@@ -455,7 +459,10 @@ def _submit_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> l
455459
"""
456460
wait_tmp_lst = []
457461
for task_wait_dict in wait_lst:
458-
if all(future.done() for future in task_wait_dict["future_lst"]):
462+
exception_lst = _get_exception_lst(future_lst=task_wait_dict["future_lst"])
463+
if len(exception_lst) > 0:
464+
task_wait_dict["future"].set_exception(exception_lst[0])
465+
elif all(future.done() for future in task_wait_dict["future_lst"]):
459466
del task_wait_dict["future_lst"]
460467
task_wait_dict["args"], task_wait_dict["kwargs"] = _update_futures_in_input(
461468
args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"]
@@ -663,3 +670,17 @@ def _execute_task_with_cache(
663670
future = task_dict["future"]
664671
future.set_result(result)
665672
future_queue.task_done()
673+
674+
675+
def _get_exception_lst(future_lst: list) -> list:
676+
def get_exception(future_obj: Future) -> bool:
677+
try:
678+
excp = future_obj.exception(timeout=10**-10)
679+
return excp is not None and not isinstance(excp, CancelledError)
680+
except TimeoutError:
681+
return False
682+
683+
if sys.version_info[0] >= 3 and sys.version_info[1] >= 11:
684+
return [f.exception() for f in future_lst if get_exception(future_obj=f)]
685+
else:
686+
return []

tests/test_dependencies_executor.py

Lines changed: 183 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from concurrent.futures import Future
22
import unittest
3+
import sys
34
from time import sleep
45
from queue import Queue
56

@@ -42,7 +43,7 @@ def return_input_dict(input_dict):
4243
return input_dict
4344

4445

45-
def raise_error():
46+
def raise_error(parameter):
4647
raise RuntimeError
4748

4849

@@ -106,6 +107,119 @@ def test_dependency_steps(self):
106107
self.assertTrue(fs2.done())
107108
q.put({"shutdown": True, "wait": True})
108109

110+
@unittest.skipIf(
111+
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
112+
reason="requires Python 3.11 or higher",
113+
)
114+
def test_dependency_steps_error(self):
115+
cloudpickle_register(ind=1)
116+
fs1 = Future()
117+
fs2 = Future()
118+
q = Queue()
119+
q.put(
120+
{
121+
"fn": raise_error,
122+
"args": (),
123+
"kwargs": {"parameter": 0},
124+
"future": fs1,
125+
"resource_dict": {"cores": 1},
126+
}
127+
)
128+
q.put(
129+
{
130+
"fn": add_function,
131+
"args": (),
132+
"kwargs": {"parameter_1": 1, "parameter_2": fs1},
133+
"future": fs2,
134+
"resource_dict": {"cores": 1},
135+
}
136+
)
137+
executor = create_single_node_executor(
138+
max_workers=1,
139+
max_cores=2,
140+
resource_dict={
141+
"cores": 1,
142+
"threads_per_core": 1,
143+
"gpus_per_core": 0,
144+
"cwd": None,
145+
"openmpi_oversubscribe": False,
146+
"slurm_cmd_args": [],
147+
},
148+
)
149+
process = RaisingThread(
150+
target=execute_tasks_with_dependencies,
151+
kwargs={
152+
"future_queue": q,
153+
"executor_queue": executor._future_queue,
154+
"executor": executor,
155+
"refresh_rate": 0.01,
156+
},
157+
)
158+
process.start()
159+
self.assertFalse(fs1.done())
160+
self.assertFalse(fs2.done())
161+
self.assertTrue(fs1.exception() is not None)
162+
self.assertTrue(fs2.exception() is not None)
163+
with self.assertRaises(RuntimeError):
164+
fs2.result()
165+
q.put({"shutdown": True, "wait": True})
166+
167+
@unittest.skipIf(
168+
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
169+
reason="requires Python 3.11 or higher",
170+
)
171+
def test_dependency_steps_error_before(self):
172+
cloudpickle_register(ind=1)
173+
fs1 = Future()
174+
fs1.set_exception(RuntimeError())
175+
fs2 = Future()
176+
q = Queue()
177+
q.put(
178+
{
179+
"fn": add_function,
180+
"args": (),
181+
"kwargs": {"parameter_1": 1, "parameter_2": 2},
182+
"future": fs1,
183+
"resource_dict": {"cores": 1},
184+
}
185+
)
186+
q.put(
187+
{
188+
"fn": add_function,
189+
"args": (),
190+
"kwargs": {"parameter_1": 1, "parameter_2": fs1},
191+
"future": fs2,
192+
"resource_dict": {"cores": 1},
193+
}
194+
)
195+
executor = create_single_node_executor(
196+
max_workers=1,
197+
max_cores=2,
198+
resource_dict={
199+
"cores": 1,
200+
"threads_per_core": 1,
201+
"gpus_per_core": 0,
202+
"cwd": None,
203+
"openmpi_oversubscribe": False,
204+
"slurm_cmd_args": [],
205+
},
206+
)
207+
process = RaisingThread(
208+
target=execute_tasks_with_dependencies,
209+
kwargs={
210+
"future_queue": q,
211+
"executor_queue": executor._future_queue,
212+
"executor": executor,
213+
"refresh_rate": 0.01,
214+
},
215+
)
216+
process.start()
217+
self.assertTrue(fs1.exception() is not None)
218+
self.assertTrue(fs2.exception() is not None)
219+
with self.assertRaises(RuntimeError):
220+
fs2.result()
221+
q.put({"shutdown": True, "wait": True})
222+
109223
def test_many_to_one(self):
110224
length = 5
111225
parameter = 1
@@ -148,22 +262,86 @@ def test_block_allocation_false_one_worker(self):
148262
with self.assertRaises(RuntimeError):
149263
with SingleNodeExecutor(max_cores=1, block_allocation=False) as exe:
150264
cloudpickle_register(ind=1)
151-
_ = exe.submit(raise_error)
265+
_ = exe.submit(raise_error, parameter=0)
152266

153267
def test_block_allocation_true_one_worker(self):
154268
with self.assertRaises(RuntimeError):
155269
with SingleNodeExecutor(max_cores=1, block_allocation=True) as exe:
156270
cloudpickle_register(ind=1)
157-
_ = exe.submit(raise_error)
271+
_ = exe.submit(raise_error, parameter=0)
158272

159273
def test_block_allocation_false_two_workers(self):
160274
with self.assertRaises(RuntimeError):
161275
with SingleNodeExecutor(max_cores=2, block_allocation=False) as exe:
162276
cloudpickle_register(ind=1)
163-
_ = exe.submit(raise_error)
277+
_ = exe.submit(raise_error, parameter=0)
164278

165279
def test_block_allocation_true_two_workers(self):
166280
with self.assertRaises(RuntimeError):
167281
with SingleNodeExecutor(max_cores=2, block_allocation=True) as exe:
168282
cloudpickle_register(ind=1)
169-
_ = exe.submit(raise_error)
283+
_ = exe.submit(raise_error, parameter=0)
284+
285+
@unittest.skipIf(
286+
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
287+
reason="requires Python 3.11 or higher",
288+
)
289+
def test_block_allocation_false_one_worker_loop(self):
290+
with self.assertRaises(RuntimeError):
291+
with SingleNodeExecutor(max_cores=1, block_allocation=False) as exe:
292+
cloudpickle_register(ind=1)
293+
lst = []
294+
for i in range(1, 4):
295+
lst = exe.submit(
296+
raise_error,
297+
parameter=lst,
298+
)
299+
lst.result()
300+
301+
@unittest.skipIf(
302+
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
303+
reason="requires Python 3.11 or higher",
304+
)
305+
def test_block_allocation_true_one_worker_loop(self):
306+
with self.assertRaises(RuntimeError):
307+
with SingleNodeExecutor(max_cores=1, block_allocation=True) as exe:
308+
cloudpickle_register(ind=1)
309+
lst = []
310+
for i in range(1, 4):
311+
lst = exe.submit(
312+
raise_error,
313+
parameter=lst,
314+
)
315+
lst.result()
316+
317+
@unittest.skipIf(
318+
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
319+
reason="requires Python 3.11 or higher",
320+
)
321+
def test_block_allocation_false_two_workers_loop(self):
322+
with self.assertRaises(RuntimeError):
323+
with SingleNodeExecutor(max_cores=2, block_allocation=False) as exe:
324+
cloudpickle_register(ind=1)
325+
lst = []
326+
for i in range(1, 4):
327+
lst = exe.submit(
328+
raise_error,
329+
parameter=lst,
330+
)
331+
lst.result()
332+
333+
@unittest.skipIf(
334+
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
335+
reason="requires Python 3.11 or higher",
336+
)
337+
def test_block_allocation_true_two_workers_loop(self):
338+
with self.assertRaises(RuntimeError):
339+
with SingleNodeExecutor(max_cores=2, block_allocation=True) as exe:
340+
cloudpickle_register(ind=1)
341+
lst = []
342+
for i in range(1, 4):
343+
lst = exe.submit(
344+
raise_error,
345+
parameter=lst,
346+
)
347+
lst.result()

0 commit comments

Comments
 (0)