Skip to content

Commit 8860291

Browse files
[feature] resize block allocation executor (#589)
* [feature] resize block allocation executor * extend test * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fixes * fix type check * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fixes * test property * test with dependencies * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fixes and increased test coverage * fix base property --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 3f3f212 commit 8860291

File tree

6 files changed

+186
-9
lines changed

6 files changed

+186
-9
lines changed

executorlib/base/executor.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ def __init__(self, max_cores: Optional[int] = None):
3232
self._future_queue: Optional[queue.Queue] = queue.Queue()
3333
self._process: Optional[Union[Thread, list[Thread]]] = None
3434

35+
@property
36+
def max_workers(self) -> Optional[int]:
37+
return self._process_kwargs.get("max_workers")
38+
39+
@max_workers.setter
40+
def max_workers(self, max_workers: int):
41+
raise NotImplementedError("The max_workers setter is not implemented.")
42+
3543
@property
3644
def info(self) -> Optional[dict]:
3745
"""

executorlib/interactive/blockallocation.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import queue
12
from concurrent.futures import Future
23
from threading import Thread
34
from typing import Callable, Optional
@@ -27,7 +28,7 @@ class BlockAllocationExecutor(ExecutorBase):
2728
Examples:
2829
2930
>>> import numpy as np
30-
>>> from executorlib.interactive.shared import BlockAllocationExecutor
31+
>>> from executorlib.interactive.blockallocation import BlockAllocationExecutor
3132
>>>
3233
>>> def calc(i, j, k):
3334
>>> from mpi4py import MPI
@@ -58,16 +59,46 @@ def __init__(
5859
executor_kwargs["spawner"] = spawner
5960
executor_kwargs["queue_join_on_shutdown"] = False
6061
self._process_kwargs = executor_kwargs
62+
self._max_workers = max_workers
6163
self._set_process(
6264
process=[
6365
Thread(
6466
target=execute_tasks,
6567
kwargs=executor_kwargs,
6668
)
67-
for _ in range(max_workers)
69+
for _ in range(self._max_workers)
6870
],
6971
)
7072

73+
@property
74+
def max_workers(self) -> int:
75+
return self._max_workers
76+
77+
@max_workers.setter
78+
def max_workers(self, max_workers: int):
79+
if isinstance(self._future_queue, queue.Queue) and isinstance(
80+
self._process, list
81+
):
82+
if self._max_workers > max_workers:
83+
for _ in range(self._max_workers - max_workers):
84+
self._future_queue.queue.insert(0, {"shutdown": True, "wait": True})
85+
while len(self._process) > max_workers:
86+
self._process = [
87+
process for process in self._process if process.is_alive()
88+
]
89+
elif self._max_workers < max_workers:
90+
new_process_lst = [
91+
Thread(
92+
target=execute_tasks,
93+
kwargs=self._process_kwargs,
94+
)
95+
for _ in range(max_workers - self._max_workers)
96+
]
97+
for process_instance in new_process_lst:
98+
process_instance.start()
99+
self._process += new_process_lst
100+
self._max_workers = max_workers
101+
71102
def submit( # type: ignore
72103
self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs
73104
) -> Future:

executorlib/interactive/dependency.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,39 @@ def info(self) -> Optional[dict]:
7676
if isinstance(self._future_queue, queue.Queue):
7777
f: Future = Future()
7878
self._future_queue.queue.insert(
79-
0, {"internal": True, "task": "info", "future": f}
79+
0, {"internal": True, "task": "get_info", "future": f}
8080
)
8181
return f.result()
8282
else:
8383
return None
8484

85+
@property
86+
def max_workers(self) -> Optional[int]:
87+
if isinstance(self._future_queue, queue.Queue):
88+
f: Future = Future()
89+
self._future_queue.queue.insert(
90+
0, {"internal": True, "task": "get_max_workers", "future": f}
91+
)
92+
return f.result()
93+
else:
94+
return None
95+
96+
@max_workers.setter
97+
def max_workers(self, max_workers: int):
98+
if isinstance(self._future_queue, queue.Queue):
99+
f: Future = Future()
100+
self._future_queue.queue.insert(
101+
0,
102+
{
103+
"internal": True,
104+
"task": "set_max_workers",
105+
"max_workers": max_workers,
106+
"future": f,
107+
},
108+
)
109+
if not f.result():
110+
raise NotImplementedError("The max_workers setter is not implemented.")
111+
85112
def submit( # type: ignore
86113
self,
87114
fn: Callable[..., Any],
@@ -188,8 +215,17 @@ def _execute_tasks_with_dependencies(
188215
if ( # shutdown the executor
189216
task_dict is not None and "internal" in task_dict and task_dict["internal"]
190217
):
191-
if task_dict["task"] == "info":
218+
if task_dict["task"] == "get_info":
192219
task_dict["future"].set_result(executor.info)
220+
elif task_dict["task"] == "get_max_workers":
221+
task_dict["future"].set_result(executor.max_workers)
222+
elif task_dict["task"] == "set_max_workers":
223+
try:
224+
executor.max_workers = task_dict["max_workers"]
225+
except NotImplementedError:
226+
task_dict["future"].set_result(False)
227+
else:
228+
task_dict["future"].set_result(True)
193229
elif ( # handle function submitted to the executor
194230
task_dict is not None and "fn" in task_dict and "future" in task_dict
195231
):

executorlib/interactive/shared.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import contextlib
12
import importlib.util
23
import os
34
import queue
@@ -57,7 +58,7 @@ def execute_tasks(
5758
task_dict = future_queue.get()
5859
if "shutdown" in task_dict and task_dict["shutdown"]:
5960
interface.shutdown(wait=task_dict["wait"])
60-
future_queue.task_done()
61+
_task_done(future_queue=future_queue)
6162
if queue_join_on_shutdown:
6263
future_queue.join()
6364
break
@@ -117,10 +118,10 @@ def _execute_task_without_cache(
117118
f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
118119
except Exception as thread_exception:
119120
interface.shutdown(wait=True)
120-
future_queue.task_done()
121+
_task_done(future_queue=future_queue)
121122
f.set_exception(exception=thread_exception)
122123
else:
123-
future_queue.task_done()
124+
_task_done(future_queue=future_queue)
124125

125126

126127
def _execute_task_with_cache(
@@ -161,13 +162,18 @@ def _execute_task_with_cache(
161162
f.set_result(result)
162163
except Exception as thread_exception:
163164
interface.shutdown(wait=True)
164-
future_queue.task_done()
165+
_task_done(future_queue=future_queue)
165166
f.set_exception(exception=thread_exception)
166167
raise thread_exception
167168
else:
168-
future_queue.task_done()
169+
_task_done(future_queue=future_queue)
169170
else:
170171
_, result = get_output(file_name=file_name)
171172
future = task_dict["future"]
172173
future.set_result(result)
174+
_task_done(future_queue=future_queue)
175+
176+
177+
def _task_done(future_queue: queue.Queue):
178+
with contextlib.suppress(ValueError):
173179
future_queue.task_done()

tests/test_local_executor.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,14 @@ def test_pympiexecutor_two_workers(self):
7777
self.assertTrue(fs_1.done())
7878
self.assertTrue(fs_2.done())
7979

80+
def test_max_workers(self):
81+
with BlockAllocationExecutor(
82+
max_workers=2,
83+
executor_kwargs={},
84+
spawner=MpiExecSpawner,
85+
) as exe:
86+
self.assertEqual(exe.max_workers, 2)
87+
8088
def test_pympiexecutor_one_worker(self):
8189
with BlockAllocationExecutor(
8290
max_workers=1,
@@ -107,6 +115,14 @@ def test_pympiexecutor_two_workers(self):
107115
self.assertTrue(fs_1.done())
108116
self.assertTrue(fs_2.done())
109117

118+
def test_max_workers(self):
119+
with OneTaskPerProcessExecutor(
120+
max_workers=2,
121+
executor_kwargs={},
122+
spawner=MpiExecSpawner,
123+
) as exe:
124+
self.assertEqual(exe.max_workers, 2)
125+
110126
def test_pympiexecutor_one_worker(self):
111127
with OneTaskPerProcessExecutor(
112128
max_cores=1,
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import unittest
2+
from executorlib import SingleNodeExecutor
3+
from executorlib.standalone.serialize import cloudpickle_register
4+
5+
6+
def sleep_funct(sec):
7+
from time import sleep
8+
sleep(sec)
9+
return sec
10+
11+
12+
class TestResizing(unittest.TestCase):
13+
def test_without_dependencies_decrease(self):
14+
cloudpickle_register(ind=1)
15+
with SingleNodeExecutor(max_workers=2, block_allocation=True, disable_dependencies=True) as exe:
16+
future_lst = [exe.submit(sleep_funct, 1) for _ in range(4)]
17+
self.assertEqual([f.done() for f in future_lst], [False, False, False, False])
18+
self.assertEqual(len(exe), 4)
19+
sleep_funct(sec=0.5)
20+
exe.max_workers = 1
21+
self.assertTrue(len(exe) >= 1)
22+
self.assertEqual(len(exe._process), 1)
23+
self.assertTrue(1 <= sum([f.done() for f in future_lst]) < 3)
24+
self.assertEqual([f.result() for f in future_lst], [1, 1, 1, 1])
25+
self.assertEqual([f.done() for f in future_lst], [True, True, True, True])
26+
27+
def test_without_dependencies_increase(self):
28+
cloudpickle_register(ind=1)
29+
with SingleNodeExecutor(max_workers=1, block_allocation=True, disable_dependencies=True) as exe:
30+
future_lst = [exe.submit(sleep_funct, 0.1) for _ in range(4)]
31+
self.assertEqual([f.done() for f in future_lst], [False, False, False, False])
32+
self.assertEqual(len(exe), 4)
33+
self.assertEqual(exe.max_workers, 1)
34+
future_lst[0].result()
35+
exe.max_workers = 2
36+
self.assertEqual(exe.max_workers, 2)
37+
self.assertTrue(len(exe) >= 1)
38+
self.assertEqual(len(exe._process), 2)
39+
self.assertEqual([f.done() for f in future_lst], [True, False, False, False])
40+
self.assertEqual([f.result() for f in future_lst], [0.1, 0.1, 0.1, 0.1])
41+
self.assertEqual([f.done() for f in future_lst], [True, True, True, True])
42+
43+
def test_with_dependencies_decrease(self):
44+
cloudpickle_register(ind=1)
45+
with SingleNodeExecutor(max_workers=2, block_allocation=True, disable_dependencies=False) as exe:
46+
future_lst = [exe.submit(sleep_funct, 1) for _ in range(4)]
47+
self.assertEqual([f.done() for f in future_lst], [False, False, False, False])
48+
self.assertEqual(len(exe), 4)
49+
sleep_funct(sec=0.5)
50+
exe.max_workers = 1
51+
self.assertTrue(1 <= sum([f.done() for f in future_lst]) < 3)
52+
self.assertEqual([f.result() for f in future_lst], [1, 1, 1, 1])
53+
self.assertEqual([f.done() for f in future_lst], [True, True, True, True])
54+
55+
def test_with_dependencies_increase(self):
56+
cloudpickle_register(ind=1)
57+
with SingleNodeExecutor(max_workers=1, block_allocation=True, disable_dependencies=False) as exe:
58+
future_lst = [exe.submit(sleep_funct, 0.1) for _ in range(4)]
59+
self.assertEqual([f.done() for f in future_lst], [False, False, False, False])
60+
self.assertEqual(len(exe), 4)
61+
self.assertEqual(exe.max_workers, 1)
62+
future_lst[0].result()
63+
exe.max_workers = 2
64+
self.assertEqual(exe.max_workers, 2)
65+
self.assertEqual([f.done() for f in future_lst], [True, False, False, False])
66+
self.assertEqual([f.result() for f in future_lst], [0.1, 0.1, 0.1, 0.1])
67+
self.assertEqual([f.done() for f in future_lst], [True, True, True, True])
68+
69+
def test_no_block_allocation(self):
70+
with self.assertRaises(NotImplementedError):
71+
with SingleNodeExecutor(block_allocation=False, disable_dependencies=False) as exe:
72+
exe.max_workers = 2
73+
with self.assertRaises(NotImplementedError):
74+
with SingleNodeExecutor(block_allocation=False, disable_dependencies=True) as exe:
75+
exe.max_workers = 2
76+
77+
def test_max_workers_stopped_executor(self):
78+
exe = SingleNodeExecutor(block_allocation=True)
79+
exe.shutdown(wait=True)
80+
self.assertIsNone(exe.max_workers)

0 commit comments

Comments
 (0)