Skip to content

Commit 9bacadf

Browse files
jan-janssenpyiron-runnerpre-commit-ci[bot]
authored
Worker for overflow queue (#763)
* Move scheduler to standalone * fix subprocess spawner docstring * file executor fix parallel execution * add command tests * move slurm command to standalone * implement spawner for pysqa * transfer changes * Format black * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * block_allocation * Format black * fix type hint * implement additional options for SLURM * Format black * fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * add test for flux block allocation * fixes * more fixes * fixes * handle different types * fixes * Add print commands * Format black * hash for worker directory * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update test_fluxclusterexecutor.py * fixes * fix test * only receive jobs when worker is running * fix job resubmission * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix type hints * restart workers after they were killed * Format black * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * type fixes * helper function * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * introduce restart function * fix spelling * shutdown on del * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * type fixes * Introduce stop function (#791) * all tasks are stopped with stop function * Format black * add additional break * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix typing * fixes * shutdown * restructure * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * the interface can only be none when it was cancelled before it started * fix type hints * fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * be more explizit with types --------- Co-authored-by: pyiron-runner <[email protected]> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> * merge changes * fix docstring * fixes * fix types * consistent naming scheme * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * remove duplicated task_done() call * fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * cancel items in queue * fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix return * fix duplicated arguments * resort * remove unused statement * rename variable * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update shared.py * Update blockallocation.py * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add docstrings * test for generate_command() * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add more tests * smaller tests * submit a big job * extend tests * no command * remove error test * extend tests * change error name * check more errors * clean up * extend tests * more tests * validate initialization * fix test --------- Co-authored-by: pyiron-runner <[email protected]> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent d93e777 commit 9bacadf

File tree

6 files changed

+464
-40
lines changed

6 files changed

+464
-40
lines changed

executorlib/executor/flux.py

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -358,28 +358,48 @@ def __init__(
358358
if not plot_dependency_graph:
359359
import pysqa # noqa
360360

361-
from executorlib.task_scheduler.file.task_scheduler import (
362-
create_file_executor,
363-
)
361+
if block_allocation:
362+
from executorlib.task_scheduler.interactive.spawner_pysqa import (
363+
create_pysqa_block_allocation_scheduler,
364+
)
364365

365-
super().__init__(
366-
executor=create_file_executor(
367-
max_workers=max_workers,
368-
backend="flux",
369-
max_cores=max_cores,
370-
cache_directory=cache_directory,
371-
resource_dict=resource_dict,
372-
flux_executor=None,
373-
pmi_mode=pmi_mode,
374-
flux_executor_nesting=False,
375-
flux_log_files=False,
376-
pysqa_config_directory=pysqa_config_directory,
377-
hostname_localhost=hostname_localhost,
378-
block_allocation=block_allocation,
379-
init_function=init_function,
380-
disable_dependencies=disable_dependencies,
366+
super().__init__(
367+
executor=create_pysqa_block_allocation_scheduler(
368+
max_cores=max_cores,
369+
cache_directory=cache_directory,
370+
hostname_localhost=hostname_localhost,
371+
log_obj_size=log_obj_size,
372+
pmi_mode=pmi_mode,
373+
init_function=init_function,
374+
max_workers=max_workers,
375+
resource_dict=resource_dict,
376+
pysqa_config_directory=pysqa_config_directory,
377+
backend="flux",
378+
)
379+
)
380+
else:
381+
from executorlib.task_scheduler.file.task_scheduler import (
382+
create_file_executor,
383+
)
384+
385+
super().__init__(
386+
executor=create_file_executor(
387+
max_workers=max_workers,
388+
backend="flux",
389+
max_cores=max_cores,
390+
cache_directory=cache_directory,
391+
resource_dict=resource_dict,
392+
flux_executor=None,
393+
pmi_mode=pmi_mode,
394+
flux_executor_nesting=False,
395+
flux_log_files=False,
396+
pysqa_config_directory=pysqa_config_directory,
397+
hostname_localhost=hostname_localhost,
398+
block_allocation=block_allocation,
399+
init_function=init_function,
400+
disable_dependencies=disable_dependencies,
401+
)
381402
)
382-
)
383403
else:
384404
super().__init__(
385405
executor=DependencyTaskScheduler(

executorlib/executor/slurm.py

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -166,28 +166,49 @@ def __init__(
166166
if not plot_dependency_graph:
167167
import pysqa # noqa
168168

169-
from executorlib.task_scheduler.file.task_scheduler import (
170-
create_file_executor,
171-
)
169+
if block_allocation:
170+
from executorlib.task_scheduler.interactive.spawner_pysqa import (
171+
create_pysqa_block_allocation_scheduler,
172+
)
172173

173-
super().__init__(
174-
executor=create_file_executor(
175-
max_workers=max_workers,
176-
backend="slurm",
177-
max_cores=max_cores,
178-
cache_directory=cache_directory,
179-
resource_dict=resource_dict,
180-
pmi_mode=pmi_mode,
181-
flux_executor=None,
182-
flux_executor_nesting=False,
183-
flux_log_files=False,
184-
pysqa_config_directory=pysqa_config_directory,
185-
hostname_localhost=hostname_localhost,
186-
block_allocation=block_allocation,
187-
init_function=init_function,
188-
disable_dependencies=disable_dependencies,
174+
super().__init__(
175+
executor=create_pysqa_block_allocation_scheduler(
176+
max_cores=max_cores,
177+
cache_directory=cache_directory,
178+
hostname_localhost=hostname_localhost,
179+
log_obj_size=log_obj_size,
180+
pmi_mode=pmi_mode,
181+
init_function=init_function,
182+
max_workers=max_workers,
183+
resource_dict=resource_dict,
184+
pysqa_config_directory=pysqa_config_directory,
185+
backend="slurm",
186+
),
187+
)
188+
189+
else:
190+
from executorlib.task_scheduler.file.task_scheduler import (
191+
create_file_executor,
192+
)
193+
194+
super().__init__(
195+
executor=create_file_executor(
196+
max_workers=max_workers,
197+
backend="slurm",
198+
max_cores=max_cores,
199+
cache_directory=cache_directory,
200+
resource_dict=resource_dict,
201+
pmi_mode=pmi_mode,
202+
flux_executor=None,
203+
flux_executor_nesting=False,
204+
flux_log_files=False,
205+
pysqa_config_directory=pysqa_config_directory,
206+
hostname_localhost=hostname_localhost,
207+
block_allocation=block_allocation,
208+
init_function=init_function,
209+
disable_dependencies=disable_dependencies,
210+
)
189211
)
190-
)
191212
else:
192213
super().__init__(
193214
executor=DependencyTaskScheduler(
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
import hashlib
2+
import os
3+
from time import sleep
4+
from typing import Callable, Optional
5+
6+
from pysqa import QueueAdapter
7+
8+
from executorlib.standalone.inputcheck import validate_number_of_cores
9+
from executorlib.standalone.interactive.spawner import BaseSpawner
10+
from executorlib.standalone.scheduler import pysqa_execute_command, terminate_with_pysqa
11+
from executorlib.task_scheduler.interactive.blockallocation import (
12+
BlockAllocationTaskScheduler,
13+
)
14+
15+
16+
class PysqaSpawner(BaseSpawner):
17+
def __init__(
18+
self,
19+
cwd: Optional[str] = None,
20+
cores: int = 1,
21+
threads_per_core: int = 1,
22+
gpus_per_core: int = 0,
23+
num_nodes: Optional[int] = None,
24+
exclusive: bool = False,
25+
openmpi_oversubscribe: bool = False,
26+
slurm_cmd_args: Optional[list[str]] = None,
27+
pmi_mode: Optional[str] = None,
28+
config_directory: Optional[str] = None,
29+
backend: Optional[str] = None,
30+
**kwargs,
31+
):
32+
"""
33+
Subprocess interface implementation.
34+
35+
Args:
36+
cwd (str, optional): The current working directory. Defaults to None.
37+
cores (int): The number of cores to use. Defaults to 1.
38+
threads_per_core (int): The number of threads per core. Defaults to 1.
39+
gpus_per_core (int): number of GPUs per worker - defaults to 0
40+
num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None.
41+
exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults
42+
to False.
43+
openmpi_oversubscribe (bool): Whether to oversubscribe the cores. Defaults to False.
44+
slurm_cmd_args (list, optional): Additional command line arguments for the srun call (SLURM only)
45+
pmi_mode (str, optional): PMI interface to use (OpenMPI v5 requires pmix) default is None
46+
config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
47+
backend (str): name of the backend used to spawn tasks.
48+
"""
49+
super().__init__(
50+
cwd=cwd,
51+
cores=cores,
52+
openmpi_oversubscribe=openmpi_oversubscribe,
53+
)
54+
self._threads_per_core = threads_per_core
55+
self._gpus_per_core = gpus_per_core
56+
self._num_nodes = num_nodes
57+
self._exclusive = exclusive
58+
self._slurm_cmd_args = slurm_cmd_args
59+
self._pmi_mode = pmi_mode
60+
self._config_directory = config_directory
61+
self._backend = backend
62+
self._pysqa_submission_kwargs = kwargs
63+
self._process: Optional[int] = None
64+
self._queue_adapter: Optional[QueueAdapter] = None
65+
66+
def bootup(
67+
self,
68+
command_lst: list[str],
69+
stop_function: Optional[Callable] = None,
70+
):
71+
"""
72+
Method to start the subprocess interface.
73+
74+
Args:
75+
command_lst (list[str]): The command list to execute.
76+
stop_function (Callable): Function to stop the interface.
77+
78+
Returns:
79+
bool: Whether the interface was successfully started.
80+
"""
81+
self._queue_adapter = QueueAdapter(
82+
directory=self._config_directory,
83+
queue_type=self._backend,
84+
execute_command=pysqa_execute_command,
85+
)
86+
self._process = self._start_process_helper(
87+
command_lst=command_lst,
88+
queue_adapter=self._queue_adapter,
89+
)
90+
while True:
91+
if self._check_process_helper(command_lst=command_lst):
92+
return True
93+
elif stop_function is not None and stop_function():
94+
self.shutdown(wait=True)
95+
return False
96+
else:
97+
sleep(1) # Wait for the process to start
98+
99+
def generate_command(self, command_lst: list[str]) -> list[str]:
100+
"""
101+
Method to generate the command list.
102+
103+
Args:
104+
command_lst (list[str]): The command list.
105+
106+
Returns:
107+
list[str]: The generated command list.
108+
"""
109+
if self._cores > 1 and self._backend == "slurm":
110+
command_prepend = ["srun", "-n", str(self._cores)]
111+
if self._pmi_mode is not None:
112+
command_prepend += ["--mpi=" + self._pmi_mode]
113+
if self._num_nodes is not None:
114+
command_prepend += ["-N", str(self._num_nodes)]
115+
if self._threads_per_core > 1:
116+
command_prepend += ["--cpus-per-task=" + str(self._threads_per_core)]
117+
if self._gpus_per_core > 0:
118+
command_prepend += ["--gpus-per-task=" + str(self._gpus_per_core)]
119+
if self._exclusive:
120+
command_prepend += ["--exact"]
121+
if self._openmpi_oversubscribe:
122+
command_prepend += ["--oversubscribe"]
123+
if self._slurm_cmd_args is not None and len(self._slurm_cmd_args) > 0:
124+
command_prepend += self._slurm_cmd_args
125+
elif self._cores > 1 and self._backend == "flux":
126+
command_prepend = ["flux", "run", "-n", str(self._cores)]
127+
if self._pmi_mode is not None:
128+
command_prepend += ["-o", "pmi=" + self._pmi_mode]
129+
if self._num_nodes is not None:
130+
raise ValueError()
131+
if self._threads_per_core > 1:
132+
raise ValueError()
133+
if self._gpus_per_core > 0:
134+
raise ValueError()
135+
if self._exclusive:
136+
raise ValueError()
137+
if self._openmpi_oversubscribe:
138+
raise ValueError()
139+
elif self._cores > 1:
140+
raise ValueError(
141+
f"backend should be None, slurm or flux, not {self._backend}"
142+
)
143+
else:
144+
command_prepend = []
145+
return command_prepend + command_lst
146+
147+
def shutdown(self, wait: bool = True):
148+
"""
149+
Method to shutdown the subprocess interface.
150+
151+
Args:
152+
wait (bool, optional): Whether to wait for the interface to shutdown. Defaults to True.
153+
"""
154+
if self._process is not None:
155+
terminate_with_pysqa(
156+
queue_id=self._process,
157+
config_directory=self._config_directory,
158+
backend=self._backend,
159+
)
160+
self._process = None
161+
162+
def poll(self) -> bool:
163+
"""
164+
Method to check if the subprocess interface is running.
165+
166+
Returns:
167+
bool: True if the interface is running, False otherwise.
168+
"""
169+
if self._process is not None and self._queue_adapter is not None:
170+
status = self._queue_adapter.get_status_of_job(process_id=self._process)
171+
return status in ["running", "pending"]
172+
else:
173+
return False
174+
175+
def _start_process_helper(
176+
self, command_lst: list[str], queue_adapter: QueueAdapter
177+
) -> int:
178+
hash = hashlib.md5(str(self).encode()).hexdigest()
179+
if self._cwd is not None:
180+
working_directory = os.path.join(self._cwd, hash)
181+
else:
182+
working_directory = os.path.abspath(hash)
183+
return queue_adapter.submit_job(
184+
command=" ".join(self.generate_command(command_lst=command_lst)),
185+
working_directory=working_directory,
186+
cores=int(self._cores * self._threads_per_core),
187+
**self._pysqa_submission_kwargs,
188+
)
189+
190+
def _check_process_helper(self, command_lst: list[str]) -> bool:
191+
if self._queue_adapter is not None:
192+
status = self._queue_adapter.get_status_of_job(process_id=self._process)
193+
else:
194+
status = None
195+
if status == "running":
196+
return True
197+
elif status is None:
198+
raise RuntimeError(
199+
f"Failed to start the process with command: {command_lst}"
200+
)
201+
elif status == "error":
202+
self._process = self._start_process_helper(
203+
command_lst=command_lst, queue_adapter=self._queue_adapter
204+
)
205+
return False
206+
207+
def __del__(self):
208+
self.shutdown(wait=True)
209+
210+
211+
def create_pysqa_block_allocation_scheduler(
212+
max_cores: Optional[int] = None,
213+
cache_directory: Optional[str] = None,
214+
hostname_localhost: Optional[bool] = None,
215+
log_obj_size: bool = False,
216+
pmi_mode: Optional[str] = None,
217+
init_function: Optional[Callable] = None,
218+
max_workers: Optional[int] = None,
219+
resource_dict: Optional[dict] = None,
220+
pysqa_config_directory: Optional[str] = None,
221+
backend: Optional[str] = None,
222+
):
223+
if resource_dict is None:
224+
resource_dict = {}
225+
cores_per_worker = resource_dict.get("cores", 1)
226+
if "cwd" in resource_dict and resource_dict["cwd"] is not None:
227+
resource_dict["cwd"] = os.path.abspath(resource_dict["cwd"])
228+
if cache_directory is not None:
229+
resource_dict["cache_directory"] = os.path.abspath(cache_directory)
230+
else:
231+
resource_dict["cache_directory"] = os.path.abspath(".")
232+
resource_dict["hostname_localhost"] = hostname_localhost
233+
resource_dict["log_obj_size"] = log_obj_size
234+
resource_dict["pmi_mode"] = pmi_mode
235+
resource_dict["init_function"] = init_function
236+
resource_dict["config_directory"] = pysqa_config_directory
237+
resource_dict["backend"] = backend
238+
max_workers = validate_number_of_cores(
239+
max_cores=max_cores,
240+
max_workers=max_workers,
241+
cores_per_worker=cores_per_worker,
242+
set_local_cores=False,
243+
)
244+
return BlockAllocationTaskScheduler(
245+
max_workers=max_workers,
246+
executor_kwargs=resource_dict,
247+
spawner=PysqaSpawner,
248+
)

0 commit comments

Comments
 (0)