Skip to content

Commit ba0f8ab

Browse files
Refactor Task scheduler (#637)
* refactor internal interfaces * split task scheduler and executor * major refactoring * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix backends * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix imports * fix hidden imports * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update documentation * revert file to cache * rename cache file * another fix * last fixes --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent f3c99a1 commit ba0f8ab

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+812
-271
lines changed

executorlib/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from executorlib._version import get_versions as _get_versions
2-
from executorlib.interfaces.flux import (
2+
from executorlib.executor.flux import (
33
FluxClusterExecutor,
44
FluxJobExecutor,
55
)
6-
from executorlib.interfaces.single import SingleNodeExecutor
7-
from executorlib.interfaces.slurm import (
6+
from executorlib.executor.single import SingleNodeExecutor
7+
from executorlib.executor.slurm import (
88
SlurmClusterExecutor,
99
SlurmJobExecutor,
1010
)

executorlib/backend/cache_parallel.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44

55
import cloudpickle
66

7-
from executorlib.cache.backend import backend_load_file, backend_write_file
7+
from executorlib.task_scheduler.file.backend import (
8+
backend_load_file,
9+
backend_write_file,
10+
)
811

912

1013
def main() -> None:
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import sys
22

3-
from executorlib.cache.backend import backend_execute_task_in_file
3+
from executorlib.task_scheduler.file.backend import backend_execute_task_in_file
44

55
if __name__ == "__main__":
66
backend_execute_task_in_file(file_name=sys.argv[1])
File renamed without changes.

executorlib/executor/base.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
import queue
2+
from concurrent.futures import (
3+
Executor as FutureExecutor,
4+
)
5+
from concurrent.futures import (
6+
Future,
7+
)
8+
from typing import Callable, Optional
9+
10+
from executorlib.task_scheduler.base import TaskSchedulerBase
11+
12+
13+
class ExecutorBase(FutureExecutor):
14+
"""
15+
Interface class for the executor.
16+
17+
Args:
18+
executor (TaskSchedulerBase): internal executor
19+
"""
20+
21+
def __init__(self, executor: TaskSchedulerBase):
22+
self._task_scheduler = executor
23+
24+
@property
25+
def max_workers(self) -> Optional[int]:
26+
return self._task_scheduler.max_workers
27+
28+
@max_workers.setter
29+
def max_workers(self, max_workers: int):
30+
self._task_scheduler.max_workers = max_workers
31+
32+
@property
33+
def info(self) -> Optional[dict]:
34+
"""
35+
Get the information about the executor.
36+
37+
Returns:
38+
Optional[dict]: Information about the executor.
39+
"""
40+
return self._task_scheduler.info
41+
42+
@property
43+
def future_queue(self) -> Optional[queue.Queue]:
44+
"""
45+
Get the future queue.
46+
47+
Returns:
48+
queue.Queue: The future queue.
49+
"""
50+
return self._task_scheduler.future_queue
51+
52+
def submit( # type: ignore
53+
self,
54+
fn: Callable,
55+
/,
56+
*args,
57+
resource_dict: Optional[dict] = None,
58+
**kwargs,
59+
) -> Future:
60+
"""
61+
Submits a callable to be executed with the given arguments.
62+
63+
Schedules the callable to be executed as fn(*args, **kwargs) and returns
64+
a Future instance representing the execution of the callable.
65+
66+
Args:
67+
fn (callable): function to submit for execution
68+
args: arguments for the submitted function
69+
kwargs: keyword arguments for the submitted function
70+
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
71+
function. Example resource dictionary: {
72+
cores: 1,
73+
threads_per_core: 1,
74+
gpus_per_worker: 0,
75+
oversubscribe: False,
76+
cwd: None,
77+
executor: None,
78+
hostname_localhost: False,
79+
}
80+
81+
Returns:
82+
Future: A Future representing the given call.
83+
"""
84+
return self._task_scheduler.submit(
85+
*([fn] + list(args)), resource_dict=resource_dict, **kwargs
86+
)
87+
88+
def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
89+
"""
90+
Clean-up the resources associated with the Executor.
91+
92+
It is safe to call this method several times. Otherwise, no other
93+
methods can be called after this one.
94+
95+
Args:
96+
wait (bool): If True then shutdown will not return until all running
97+
futures have finished executing and the resources used by the
98+
parallel_executors have been reclaimed.
99+
cancel_futures (bool): If True then shutdown will cancel all pending
100+
futures. Futures that are completed or running will not be
101+
cancelled.
102+
"""
103+
self._task_scheduler.shutdown(wait=wait, cancel_futures=cancel_futures)
104+
105+
def __len__(self) -> int:
106+
"""
107+
Get the length of the executor.
108+
109+
Returns:
110+
int: The length of the executor.
111+
"""
112+
return len(self._task_scheduler)
113+
114+
def __exit__(self, *args, **kwargs) -> None:
115+
"""
116+
Exit method called when exiting the context manager.
117+
"""
118+
self._task_scheduler.__exit__(*args, **kwargs)

executorlib/interfaces/flux.py renamed to executorlib/executor/flux.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
import contextlib
22
from typing import Callable, Optional, Union
33

4-
from executorlib.base.executor import ExecutorInterface
5-
from executorlib.interactive.blockallocation import BlockAllocationExecutor
6-
from executorlib.interactive.dependency import DependencyExecutor
7-
from executorlib.interactive.onetoone import OneTaskPerProcessExecutor
4+
from executorlib.executor.base import ExecutorBase
85
from executorlib.standalone.inputcheck import (
96
check_command_line_argument_lst,
107
check_init_function,
@@ -14,15 +11,20 @@
1411
check_refresh_rate,
1512
validate_number_of_cores,
1613
)
14+
from executorlib.task_scheduler.interactive.blockallocation import (
15+
BlockAllocationTaskScheduler,
16+
)
17+
from executorlib.task_scheduler.interactive.dependency import DependencyTaskScheduler
18+
from executorlib.task_scheduler.interactive.onetoone import OneProcessTaskScheduler
1719

1820
with contextlib.suppress(ImportError):
19-
from executorlib.interactive.fluxspawner import (
21+
from executorlib.task_scheduler.interactive.fluxspawner import (
2022
FluxPythonSpawner,
2123
validate_max_workers,
2224
)
2325

2426

25-
class FluxJobExecutor(ExecutorInterface):
27+
class FluxJobExecutor(ExecutorBase):
2628
"""
2729
The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
2830
preferable the flux framework for distributing python functions within a given resource allocation. In contrast to
@@ -70,7 +72,7 @@ class FluxJobExecutor(ExecutorInterface):
7072
Examples:
7173
```
7274
>>> import numpy as np
73-
>>> from executorlib.interfaces.flux import FluxJobExecutor
75+
>>> from executorlib.executor.flux import FluxJobExecutor
7476
>>>
7577
>>> def calc(i, j, k):
7678
>>> from mpi4py import MPI
@@ -167,7 +169,7 @@ def __init__(
167169
)
168170
if not disable_dependencies:
169171
super().__init__(
170-
executor=DependencyExecutor(
172+
executor=DependencyTaskScheduler(
171173
executor=create_flux_executor(
172174
max_workers=max_workers,
173175
cache_directory=cache_directory,
@@ -207,7 +209,7 @@ def __init__(
207209
)
208210

209211

210-
class FluxClusterExecutor(ExecutorInterface):
212+
class FluxClusterExecutor(ExecutorBase):
211213
"""
212214
The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
213215
preferable the flux framework for distributing python functions within a given resource allocation. In contrast to
@@ -251,7 +253,7 @@ class FluxClusterExecutor(ExecutorInterface):
251253
Examples:
252254
```
253255
>>> import numpy as np
254-
>>> from executorlib.interfaces.flux import FluxClusterExecutor
256+
>>> from executorlib.executor.flux import FluxClusterExecutor
255257
>>>
256258
>>> def calc(i, j, k):
257259
>>> from mpi4py import MPI
@@ -341,7 +343,9 @@ def __init__(
341343
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
342344
)
343345
if not plot_dependency_graph:
344-
from executorlib.cache.executor import create_file_executor
346+
from executorlib.task_scheduler.file.task_scheduler import (
347+
create_file_executor,
348+
)
345349

346350
super().__init__(
347351
executor=create_file_executor(
@@ -363,7 +367,7 @@ def __init__(
363367
)
364368
else:
365369
super().__init__(
366-
executor=DependencyExecutor(
370+
executor=DependencyTaskScheduler(
367371
executor=create_flux_executor(
368372
max_workers=max_workers,
369373
cache_directory=cache_directory,
@@ -397,7 +401,7 @@ def create_flux_executor(
397401
hostname_localhost: Optional[bool] = None,
398402
block_allocation: bool = False,
399403
init_function: Optional[Callable] = None,
400-
) -> Union[OneTaskPerProcessExecutor, BlockAllocationExecutor]:
404+
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
401405
"""
402406
Create a flux executor
403407
@@ -468,13 +472,13 @@ def create_flux_executor(
468472
cores=cores_per_worker,
469473
threads_per_core=resource_dict.get("threads_per_core", 1),
470474
)
471-
return BlockAllocationExecutor(
475+
return BlockAllocationTaskScheduler(
472476
max_workers=max_workers,
473477
executor_kwargs=resource_dict,
474478
spawner=FluxPythonSpawner,
475479
)
476480
else:
477-
return OneTaskPerProcessExecutor(
481+
return OneProcessTaskScheduler(
478482
max_cores=max_cores,
479483
max_workers=max_workers,
480484
executor_kwargs=resource_dict,

executorlib/interfaces/single.py renamed to executorlib/executor/single.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
from typing import Callable, Optional, Union
22

3-
from executorlib.base.executor import ExecutorInterface
4-
from executorlib.interactive.blockallocation import BlockAllocationExecutor
5-
from executorlib.interactive.dependency import DependencyExecutor
6-
from executorlib.interactive.onetoone import OneTaskPerProcessExecutor
3+
from executorlib.executor.base import ExecutorBase
74
from executorlib.standalone.inputcheck import (
85
check_command_line_argument_lst,
96
check_gpus_per_worker,
@@ -13,9 +10,14 @@
1310
validate_number_of_cores,
1411
)
1512
from executorlib.standalone.interactive.spawner import MpiExecSpawner
13+
from executorlib.task_scheduler.interactive.blockallocation import (
14+
BlockAllocationTaskScheduler,
15+
)
16+
from executorlib.task_scheduler.interactive.dependency import DependencyTaskScheduler
17+
from executorlib.task_scheduler.interactive.onetoone import OneProcessTaskScheduler
1618

1719

18-
class SingleNodeExecutor(ExecutorInterface):
20+
class SingleNodeExecutor(ExecutorBase):
1921
"""
2022
The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
2123
preferable the flux framework for distributing python functions within a given resource allocation. In contrast to
@@ -58,7 +60,7 @@ class SingleNodeExecutor(ExecutorInterface):
5860
Examples:
5961
```
6062
>>> import numpy as np
61-
>>> from executorlib.interfaces.single import SingleNodeExecutor
63+
>>> from executorlib.executor.single import SingleNodeExecutor
6264
>>>
6365
>>> def calc(i, j, k):
6466
>>> from mpi4py import MPI
@@ -147,7 +149,7 @@ def __init__(
147149
)
148150
if not disable_dependencies:
149151
super().__init__(
150-
executor=DependencyExecutor(
152+
executor=DependencyTaskScheduler(
151153
executor=create_single_node_executor(
152154
max_workers=max_workers,
153155
cache_directory=cache_directory,
@@ -187,7 +189,7 @@ def create_single_node_executor(
187189
hostname_localhost: Optional[bool] = None,
188190
block_allocation: bool = False,
189191
init_function: Optional[Callable] = None,
190-
) -> Union[OneTaskPerProcessExecutor, BlockAllocationExecutor]:
192+
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
191193
"""
192194
Create a single node executor
193195
@@ -241,7 +243,7 @@ def create_single_node_executor(
241243
del resource_dict["slurm_cmd_args"]
242244
if block_allocation:
243245
resource_dict["init_function"] = init_function
244-
return BlockAllocationExecutor(
246+
return BlockAllocationTaskScheduler(
245247
max_workers=validate_number_of_cores(
246248
max_cores=max_cores,
247249
max_workers=max_workers,
@@ -252,7 +254,7 @@ def create_single_node_executor(
252254
spawner=MpiExecSpawner,
253255
)
254256
else:
255-
return OneTaskPerProcessExecutor(
257+
return OneProcessTaskScheduler(
256258
max_cores=max_cores,
257259
max_workers=max_workers,
258260
executor_kwargs=resource_dict,

0 commit comments

Comments
 (0)