Skip to content

Commit 4ab32b6

Browse files
authored
Add worker ID (#748)
* Add worker ID * extend tests * fix broken tests * Add minimal sleep to fix execution order * increase sleep interval * Rename variable to executorlib_worker_id * Include executorlib_worker_id in the call of the init function
1 parent 64f3897 commit 4ab32b6

File tree

8 files changed

+85
-7
lines changed

8 files changed

+85
-7
lines changed

executorlib/backend/interactive_parallel.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def main() -> None:
4343
host=argument_dict["host"], port=argument_dict["zmqport"]
4444
)
4545

46-
memory = None
46+
memory = {"executorlib_worker_id": int(argument_dict["worker_id"])}
4747

4848
# required for flux interface - otherwise the current path is not included in the python path
4949
cwd = abspath(".")
@@ -97,7 +97,7 @@ def main() -> None:
9797
and "args" in input_dict
9898
and "kwargs" in input_dict
9999
):
100-
memory = call_funct(input_dict=input_dict, funct=None)
100+
memory.update(call_funct(input_dict=input_dict, funct=None, memory=memory))
101101

102102

103103
if __name__ == "__main__":

executorlib/backend/interactive_serial.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def main(argument_lst: Optional[list[str]] = None):
2929
host=argument_dict["host"], port=argument_dict["zmqport"]
3030
)
3131

32-
memory = None
32+
memory = {"executorlib_worker_id": int(argument_dict["worker_id"])}
3333

3434
# required for flux interface - otherwise the current path is not included in the python path
3535
cwd = abspath(".")
@@ -72,7 +72,7 @@ def main(argument_lst: Optional[list[str]] = None):
7272
and "args" in input_dict
7373
and "kwargs" in input_dict
7474
):
75-
memory = call_funct(input_dict=input_dict, funct=None)
75+
memory.update(call_funct(input_dict=input_dict, funct=None, memory=memory))
7676

7777

7878
if __name__ == "__main__":

executorlib/standalone/interactive/backend.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,9 @@ def parse_arguments(argument_lst: list[str]) -> dict:
4848
argument_dict={
4949
"zmqport": "--zmqport",
5050
"host": "--host",
51+
"worker_id": "--worker-id",
5152
},
52-
default_dict={"host": "localhost"},
53+
default_dict={"host": "localhost", "worker_id": 0},
5354
)
5455

5556

executorlib/standalone/interactive/communication.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ def interface_bootup(
136136
connections,
137137
hostname_localhost: Optional[bool] = None,
138138
log_obj_size: bool = False,
139+
worker_id: Optional[int] = None,
139140
) -> SocketInterface:
140141
"""
141142
Start interface for ZMQ communication
@@ -152,6 +153,8 @@ def interface_bootup(
152153
this look up for security reasons. So on MacOS it is required to set this
153154
option to true
154155
log_obj_size (boolean): Enable debug mode which reports the size of the communicated objects.
156+
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
157+
distribution.
155158
156159
Returns:
157160
executorlib.shared.communication.SocketInterface: socket interface for zmq communication
@@ -165,6 +168,8 @@ def interface_bootup(
165168
"--host",
166169
gethostname(),
167170
]
171+
if worker_id is not None:
172+
command_lst += ["--worker-id", str(worker_id)]
168173
interface = SocketInterface(
169174
spawner=connections,
170175
log_obj_size=log_obj_size,

executorlib/task_scheduler/interactive/blockallocation.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ def __init__(
6565
process=[
6666
Thread(
6767
target=execute_tasks,
68-
kwargs=executor_kwargs,
68+
kwargs=executor_kwargs | {"worker_id": worker_id},
6969
)
70-
for _ in range(self._max_workers)
70+
for worker_id in range(self._max_workers)
7171
],
7272
)
7373

executorlib/task_scheduler/interactive/shared.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def execute_tasks(
2424
queue_join_on_shutdown: bool = True,
2525
log_obj_size: bool = False,
2626
error_log_file: Optional[str] = None,
27+
worker_id: Optional[int] = None,
2728
**kwargs,
2829
) -> None:
2930
"""
@@ -48,6 +49,8 @@ def execute_tasks(
4849
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
4950
error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions
5051
submitted to the Executor.
52+
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
53+
distribution.
5154
"""
5255
interface = interface_bootup(
5356
command_lst=get_interactive_execute_command(
@@ -56,6 +59,7 @@ def execute_tasks(
5659
connections=spawner(cores=cores, **kwargs),
5760
hostname_localhost=hostname_localhost,
5861
log_obj_size=log_obj_size,
62+
worker_id=worker_id,
5963
)
6064
if init_function is not None:
6165
interface.send_dict(

tests/test_singlenodeexecutor_noblock.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import unittest
2+
from time import sleep
23

34
from executorlib import SingleNodeExecutor
45
from executorlib.standalone.serialize import cloudpickle_register
@@ -12,6 +13,15 @@ def resource_dict(resource_dict):
1213
return resource_dict
1314

1415

16+
def get_worker_id(executorlib_worker_id):
17+
sleep(0.1)
18+
return executorlib_worker_id
19+
20+
21+
def init_function():
22+
return {"a": 1, "b": 2}
23+
24+
1525
class TestExecutorBackend(unittest.TestCase):
1626
def test_meta_executor_serial_with_dependencies(self):
1727
with SingleNodeExecutor(
@@ -75,3 +85,58 @@ def test_errors(self):
7585
block_allocation=True,
7686
) as exe:
7787
exe.submit(resource_dict, resource_dict={})
88+
89+
90+
class TestWorkerID(unittest.TestCase):
91+
def test_block_allocation_True(self):
92+
with SingleNodeExecutor(
93+
max_cores=1,
94+
block_allocation=True,
95+
) as exe:
96+
worker_id = exe.submit(get_worker_id, resource_dict={}).result()
97+
self.assertEqual(worker_id, 0)
98+
99+
def test_block_allocation_True_two_workers(self):
100+
with SingleNodeExecutor(
101+
max_cores=2,
102+
block_allocation=True,
103+
) as exe:
104+
f1_worker_id = exe.submit(get_worker_id, resource_dict={})
105+
f2_worker_id = exe.submit(get_worker_id, resource_dict={})
106+
self.assertEqual(sum([f1_worker_id.result(), f2_worker_id.result()]), 1)
107+
108+
def test_init_function(self):
109+
with SingleNodeExecutor(
110+
max_cores=1,
111+
block_allocation=True,
112+
init_function=init_function,
113+
) as exe:
114+
worker_id = exe.submit(get_worker_id, resource_dict={}).result()
115+
self.assertEqual(worker_id, 0)
116+
117+
def test_init_function_two_workers(self):
118+
with SingleNodeExecutor(
119+
max_cores=2,
120+
block_allocation=True,
121+
init_function=init_function,
122+
) as exe:
123+
f1_worker_id = exe.submit(get_worker_id, resource_dict={})
124+
f2_worker_id = exe.submit(get_worker_id, resource_dict={})
125+
self.assertEqual(sum([f1_worker_id.result(), f2_worker_id.result()]), 1)
126+
127+
def test_block_allocation_False(self):
128+
with SingleNodeExecutor(
129+
max_cores=1,
130+
block_allocation=False,
131+
) as exe:
132+
worker_id = exe.submit(get_worker_id, resource_dict={}).result()
133+
self.assertEqual(worker_id, 0)
134+
135+
def test_block_allocation_False_two_workers(self):
136+
with SingleNodeExecutor(
137+
max_cores=2,
138+
block_allocation=False,
139+
) as exe:
140+
f1_worker_id = exe.submit(get_worker_id, resource_dict={})
141+
f2_worker_id = exe.submit(get_worker_id, resource_dict={})
142+
self.assertEqual(sum([f1_worker_id.result(), f2_worker_id.result()]), 0)

tests/test_standalone_interactive_backend.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ class TestParser(unittest.TestCase):
1111
def test_command_local(self):
1212
result_dict = {
1313
"host": "localhost",
14+
"worker_id": 0,
1415
"zmqport": "22",
1516
}
1617
command_lst = [
@@ -35,6 +36,7 @@ def test_command_local(self):
3536
def test_command_slurm(self):
3637
result_dict = {
3738
"host": "127.0.0.1",
39+
"worker_id": 0,
3840
"zmqport": "22",
3941
}
4042
command_lst = [
@@ -76,6 +78,7 @@ def test_command_slurm(self):
7678
def test_command_slurm_user_command(self):
7779
result_dict = {
7880
"host": "127.0.0.1",
81+
"worker_id": 0,
7982
"zmqport": "22",
8083
}
8184
command_lst = [

0 commit comments

Comments
 (0)