Skip to content

Commit d93e777

Browse files
Interactive: Interrupt interface bootup when the executor is shutdown during bootup (#801)
* Interactive: Interrupt interface bootup when the executor is shutdown during bootup * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix flux interface * sync changes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix type hints * type fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fixes * remove shutdown * fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * type fixes * add tests * more tests * fix minimal * fix interface * clean up interface * fixes * fix type hints * fix tests * fix tests * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * update docstring * fix * Add restart_limit to resource_dict * Add error messages * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Use random hash --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 88e0361 commit d93e777

File tree

11 files changed

+417
-62
lines changed

11 files changed

+417
-62
lines changed

executorlib/executor/flux.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class FluxJobExecutor(BaseExecutor):
4343
compute notes. Defaults to False.
4444
- error_log_file (str): Name of the error log file to use for storing exceptions raised
4545
by the Python functions submitted to the Executor.
46+
- restart_limit (int): The maximum number of restarting worker processes. Default: 0
4647
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
4748
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
4849
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.

executorlib/executor/single.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ def __init__(
120120
only)
121121
- error_log_file (str): Name of the error log file to use for storing exceptions
122122
raised by the Python functions submitted to the Executor.
123+
- restart_limit (int): The maximum number of restarting worker processes. Default: 0
123124
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
124125
context of an HPC cluster this essential to be able to communicate to an
125126
Executor running on a different compute node within the same allocation. And

executorlib/executor/slurm.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class SlurmClusterExecutor(BaseExecutor):
4343
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
4444
- error_log_file (str): Name of the error log file to use for storing exceptions raised
4545
by the Python functions submitted to the Executor.
46+
- restart_limit (int): The maximum number of restarting worker processes. Default: 0
4647
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
4748
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
4849
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the

executorlib/standalone/interactive/communication.py

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
import sys
33
from socket import gethostname
4-
from typing import Any, Optional
4+
from typing import Any, Callable, Optional
55

66
import cloudpickle
77
import zmq
@@ -42,6 +42,17 @@ def __init__(
4242
if log_obj_size:
4343
self._logger = logging.getLogger("executorlib")
4444
self._spawner = spawner
45+
self._command_lst: list[str] = []
46+
self._booted_sucessfully: bool = False
47+
self._stop_function: Optional[Callable] = None
48+
49+
@property
50+
def status(self) -> bool:
51+
return self._booted_sucessfully
52+
53+
@status.setter
54+
def status(self, status: bool):
55+
self._booted_sucessfully = status
4556

4657
def send_dict(self, input_dict: dict):
4758
"""
@@ -67,7 +78,9 @@ def receive_dict(self) -> dict:
6778
while len(response_lst) == 0:
6879
response_lst = self._poller.poll(self._time_out_ms)
6980
if not self._spawner.poll():
70-
raise ExecutorlibSocketError()
81+
raise ExecutorlibSocketError(
82+
"SocketInterface crashed during execution."
83+
)
7184
data = self._socket.recv(zmq.NOBLOCK)
7285
if self._logger is not None:
7386
self._logger.warning(
@@ -105,20 +118,30 @@ def bind_to_random_port(self) -> int:
105118

106119
def bootup(
107120
self,
108-
command_lst: list[str],
109-
) -> bool:
121+
command_lst: Optional[list[str]] = None,
122+
stop_function: Optional[Callable] = None,
123+
):
110124
"""
111125
Boot up the client process to connect to the SocketInterface.
112126
113127
Args:
114128
command_lst (list): list of strings to start the client process
115-
116-
Returns:
117-
bool: Whether the interface was successfully started.
129+
stop_function (Callable): Function to stop the interface.
118130
"""
119-
return self._spawner.bootup(
120-
command_lst=command_lst,
121-
)
131+
if command_lst is not None:
132+
self._command_lst = command_lst
133+
if stop_function is not None:
134+
self._stop_function = stop_function
135+
if len(self._command_lst) == 0:
136+
raise ValueError("No command defined to boot up SocketInterface.")
137+
if not self._spawner.bootup(
138+
command_lst=self._command_lst,
139+
stop_function=self._stop_function,
140+
):
141+
self._reset_socket()
142+
self._booted_sucessfully = False
143+
else:
144+
self._booted_sucessfully = True
122145

123146
def shutdown(self, wait: bool = True):
124147
"""
@@ -162,6 +185,7 @@ def interface_bootup(
162185
hostname_localhost: Optional[bool] = None,
163186
log_obj_size: bool = False,
164187
worker_id: Optional[int] = None,
188+
stop_function: Optional[Callable] = None,
165189
) -> SocketInterface:
166190
"""
167191
Start interface for ZMQ communication
@@ -180,6 +204,7 @@ def interface_bootup(
180204
log_obj_size (boolean): Enable debug mode which reports the size of the communicated objects.
181205
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
182206
distribution.
207+
stop_function (Callable): Function to stop the interface.
183208
184209
Returns:
185210
executorlib.shared.communication.SocketInterface: socket interface for zmq communication
@@ -203,6 +228,7 @@ def interface_bootup(
203228
]
204229
interface.bootup(
205230
command_lst=command_lst,
231+
stop_function=stop_function,
206232
)
207233
return interface
208234

executorlib/standalone/interactive/spawner.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import os
22
import subprocess
33
from abc import ABC, abstractmethod
4-
from typing import Optional
4+
from typing import Callable, Optional
55

66
MPI_COMMAND = "mpiexec"
77

@@ -29,12 +29,17 @@ def __init__(
2929
def bootup(
3030
self,
3131
command_lst: list[str],
32-
):
32+
stop_function: Optional[Callable] = None,
33+
) -> bool:
3334
"""
3435
Method to start the interface.
3536
3637
Args:
3738
command_lst (list[str]): The command list to execute.
39+
stop_function (Callable): Function to stop the interface.
40+
41+
Returns:
42+
bool: Whether the interface was successfully started.
3843
"""
3944
raise NotImplementedError
4045

@@ -87,12 +92,17 @@ def __init__(
8792
def bootup(
8893
self,
8994
command_lst: list[str],
90-
):
95+
stop_function: Optional[Callable] = None,
96+
) -> bool:
9197
"""
9298
Method to start the subprocess interface.
9399
94100
Args:
95101
command_lst (list[str]): The command list to execute.
102+
stop_function (Callable): Function to stop the interface.
103+
104+
Returns:
105+
bool: Whether the interface was successfully started.
96106
"""
97107
if self._cwd is not None:
98108
os.makedirs(self._cwd, exist_ok=True)

executorlib/task_scheduler/interactive/blockallocation.py

Lines changed: 81 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import queue
2+
import random
23
from concurrent.futures import Future
34
from threading import Thread
45
from typing import Callable, Optional
@@ -8,11 +9,21 @@
89
check_resource_dict,
910
check_resource_dict_is_empty,
1011
)
11-
from executorlib.standalone.interactive.communication import interface_bootup
12+
from executorlib.standalone.interactive.communication import (
13+
ExecutorlibSocketError,
14+
SocketInterface,
15+
interface_bootup,
16+
)
1217
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
1318
from executorlib.standalone.queue import cancel_items_in_queue
1419
from executorlib.task_scheduler.base import TaskSchedulerBase
15-
from executorlib.task_scheduler.interactive.shared import execute_task_dict, task_done
20+
from executorlib.task_scheduler.interactive.shared import (
21+
execute_task_dict,
22+
reset_task_dict,
23+
task_done,
24+
)
25+
26+
_interrupt_bootup_dict: dict = {}
1627

1728

1829
class BlockAllocationTaskScheduler(TaskSchedulerBase):
@@ -63,11 +74,18 @@ def __init__(
6374
executor_kwargs["queue_join_on_shutdown"] = False
6475
self._process_kwargs = executor_kwargs
6576
self._max_workers = max_workers
77+
self_id = random.getrandbits(128)
78+
self._self_id = self_id
79+
_interrupt_bootup_dict[self._self_id] = False
6680
self._set_process(
6781
process=[
6882
Thread(
6983
target=_execute_multiple_tasks,
70-
kwargs=executor_kwargs | {"worker_id": worker_id},
84+
kwargs=executor_kwargs
85+
| {
86+
"worker_id": worker_id,
87+
"stop_function": lambda: _interrupt_bootup_dict[self_id],
88+
},
7189
)
7290
for worker_id in range(self._max_workers)
7391
],
@@ -157,6 +175,7 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
157175
if cancel_futures:
158176
cancel_items_in_queue(que=self._future_queue)
159177
if isinstance(self._process, list):
178+
_interrupt_bootup_dict[self._self_id] = True
160179
for _ in range(len(self._process)):
161180
self._future_queue.put({"shutdown": True, "wait": wait})
162181
if wait:
@@ -190,6 +209,8 @@ def _execute_multiple_tasks(
190209
log_obj_size: bool = False,
191210
error_log_file: Optional[str] = None,
192211
worker_id: Optional[int] = None,
212+
stop_function: Optional[Callable] = None,
213+
restart_limit: int = 0,
193214
**kwargs,
194215
) -> None:
195216
"""
@@ -216,6 +237,8 @@ def _execute_multiple_tasks(
216237
submitted to the Executor.
217238
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
218239
distribution.
240+
stop_function (Callable): Function to stop the interface.
241+
restart_limit (int): The maximum number of restarting worker processes.
219242
"""
220243
interface = interface_bootup(
221244
command_lst=get_interactive_execute_command(
@@ -225,34 +248,66 @@ def _execute_multiple_tasks(
225248
hostname_localhost=hostname_localhost,
226249
log_obj_size=log_obj_size,
227250
worker_id=worker_id,
251+
stop_function=stop_function,
252+
)
253+
interface_initialization_exception = _set_init_function(
254+
interface=interface,
255+
init_function=init_function,
228256
)
257+
restart_counter = 0
258+
while True:
259+
if not interface.status and restart_counter > restart_limit:
260+
interface.status = True # no more restarts
261+
interface_initialization_exception = ExecutorlibSocketError(
262+
"SocketInterface crashed during execution."
263+
)
264+
elif not interface.status:
265+
interface.bootup()
266+
interface_initialization_exception = _set_init_function(
267+
interface=interface,
268+
init_function=init_function,
269+
)
270+
restart_counter += 1
271+
else: # interface.status == True
272+
task_dict = future_queue.get()
273+
if "shutdown" in task_dict and task_dict["shutdown"]:
274+
if interface.status:
275+
interface.shutdown(wait=task_dict["wait"])
276+
task_done(future_queue=future_queue)
277+
if queue_join_on_shutdown:
278+
future_queue.join()
279+
break
280+
elif "fn" in task_dict and "future" in task_dict:
281+
f = task_dict.pop("future")
282+
if interface_initialization_exception is not None:
283+
f.set_exception(exception=interface_initialization_exception)
284+
else:
285+
# The interface failed during the execution
286+
interface.status = execute_task_dict(
287+
task_dict=task_dict,
288+
future_obj=f,
289+
interface=interface,
290+
cache_directory=cache_directory,
291+
cache_key=cache_key,
292+
error_log_file=error_log_file,
293+
)
294+
if not interface.status:
295+
reset_task_dict(
296+
future_obj=f, future_queue=future_queue, task_dict=task_dict
297+
)
298+
task_done(future_queue=future_queue)
299+
300+
301+
def _set_init_function(
302+
interface: SocketInterface,
303+
init_function: Optional[Callable] = None,
304+
) -> Optional[Exception]:
229305
interface_initialization_exception = None
230-
if init_function is not None:
306+
if init_function is not None and interface.status:
231307
try:
232308
_ = interface.send_and_receive_dict(
233309
input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
234310
)
235311
except Exception as init_exception:
236312
interface_initialization_exception = init_exception
237-
while True:
238-
task_dict = future_queue.get()
239-
if "shutdown" in task_dict and task_dict["shutdown"]:
240-
interface.shutdown(wait=task_dict["wait"])
241-
task_done(future_queue=future_queue)
242-
if queue_join_on_shutdown:
243-
future_queue.join()
244-
break
245-
elif "fn" in task_dict and "future" in task_dict:
246-
f = task_dict.pop("future")
247-
if interface_initialization_exception is not None:
248-
f.set_exception(exception=interface_initialization_exception)
249-
else:
250-
execute_task_dict(
251-
task_dict=task_dict,
252-
future_obj=f,
253-
interface=interface,
254-
cache_directory=cache_directory,
255-
cache_key=cache_key,
256-
error_log_file=error_log_file,
257-
)
258-
task_done(future_queue=future_queue)
313+
return interface_initialization_exception

executorlib/task_scheduler/interactive/onetoone.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
from typing import Optional
55

66
from executorlib.standalone.command import get_interactive_execute_command
7-
from executorlib.standalone.interactive.communication import interface_bootup
7+
from executorlib.standalone.interactive.communication import (
8+
ExecutorlibSocketError,
9+
interface_bootup,
10+
)
811
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
912
from executorlib.task_scheduler.base import TaskSchedulerBase
1013
from executorlib.task_scheduler.interactive.shared import execute_task_dict
@@ -230,7 +233,7 @@ def _execute_task_in_thread(
230233
error_log_file: Optional[str] = None,
231234
worker_id: Optional[int] = None,
232235
**kwargs,
233-
) -> None:
236+
):
234237
"""
235238
Execute a single tasks in parallel using the message passing interface (MPI).
236239
@@ -256,7 +259,7 @@ def _execute_task_in_thread(
256259
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
257260
distribution.
258261
"""
259-
execute_task_dict(
262+
if not execute_task_dict(
260263
task_dict=task_dict,
261264
future_obj=future_obj,
262265
interface=interface_bootup(
@@ -271,4 +274,7 @@ def _execute_task_in_thread(
271274
cache_directory=cache_directory,
272275
cache_key=cache_key,
273276
error_log_file=error_log_file,
274-
)
277+
):
278+
future_obj.set_exception(
279+
ExecutorlibSocketError("SocketInterface crashed during execution.")
280+
)

0 commit comments

Comments
 (0)