Skip to content

Commit 4568507

Browse files
Implement splitting of future objects (#819)
* Implement splitting of future objects * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update split.py * Update split.py * fix tests * extend tests * cover all lines * Import from API * fixes * Add support for dictionaries * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * use dict function * move split to main * Add docstrings * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Test response is None * Rename splitfuture to futureselector * Rename module to select rather than split --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent d827ac5 commit 4568507

File tree

3 files changed

+157
-0
lines changed

3 files changed

+157
-0
lines changed

executorlib/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
SlurmClusterExecutor,
2626
SlurmJobExecutor,
2727
)
28+
from executorlib.standalone.select import get_item_from_future, split_future
2829

2930

3031
def get_cache_data(cache_directory: str) -> list[dict]:
@@ -66,6 +67,8 @@ def terminate_tasks_in_cache(
6667

6768
__all__: list[str] = [
6869
"get_cache_data",
70+
"get_item_from_future",
71+
"split_future",
6972
"terminate_tasks_in_cache",
7073
"BaseExecutor",
7174
"FluxJobExecutor",

executorlib/standalone/select.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
from concurrent.futures import Future
2+
from typing import Any, Optional
3+
4+
5+
class FutureSelector(Future):
6+
def __init__(self, future: Future, selector: int | str):
7+
super().__init__()
8+
self._future = future
9+
self._selector = selector
10+
11+
def cancel(self) -> bool:
12+
return self._future.cancel()
13+
14+
def cancelled(self) -> bool:
15+
return self._future.cancelled()
16+
17+
def running(self) -> bool:
18+
return self._future.running()
19+
20+
def done(self) -> bool:
21+
return self._future.done()
22+
23+
def add_done_callback(self, fn) -> None:
24+
return self._future.add_done_callback(fn=fn)
25+
26+
def result(self, timeout: Optional[float] = None) -> Any:
27+
result = self._future.result(timeout=timeout)
28+
if result is not None:
29+
return result[self._selector]
30+
else:
31+
return None
32+
33+
def exception(self, timeout: Optional[float] = None) -> Optional[BaseException]:
34+
return self._future.exception(timeout=timeout)
35+
36+
def set_running_or_notify_cancel(self) -> bool:
37+
return self._future.set_running_or_notify_cancel()
38+
39+
def set_result(self, result: Any) -> None:
40+
return self._future.set_result(result=result)
41+
42+
def set_exception(self, exception: Optional[BaseException]) -> None:
43+
return self._future.set_exception(exception=exception)
44+
45+
46+
def split_future(future: Future, n: int) -> list[FutureSelector]:
47+
"""
48+
Split a concurrent.futures.Future object which returns a tuple or list as result into individual future objects
49+
50+
Args:
51+
future (Future): future object which returns a tuple or list as result
52+
n: number of elements expected in the future object
53+
54+
Returns:
55+
list: List of future objects
56+
"""
57+
return [FutureSelector(future=future, selector=i) for i in range(n)]
58+
59+
60+
def get_item_from_future(future: Future, key: str) -> FutureSelector:
61+
"""
62+
Get item from concurrent.futures.Future object which returns a dictionary as result by the corresponding dictionary
63+
key.
64+
65+
Args:
66+
future (Future): future object which returns a dictionary as result
67+
key (str): dictionary key to get item from dictionary
68+
69+
Returns:
70+
FutureSelector: Future object which returns the value corresponding to the key
71+
"""
72+
return FutureSelector(future=future, selector=key)

tests/test_standalone_select.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import unittest
2+
from concurrent.futures import Future
3+
from executorlib import SingleNodeExecutor, split_future, get_item_from_future
4+
from executorlib.api import cloudpickle_register
5+
from executorlib.standalone.select import FutureSelector
6+
7+
8+
def function_returns_tuple(i):
9+
return "a", "b", i
10+
11+
12+
def function_returns_dict(i):
13+
return {"a": 1, "b": 2, "c": i}
14+
15+
16+
def function_with_exception(i):
17+
raise RuntimeError()
18+
19+
20+
def callback(future):
21+
print("callback:", future.result())
22+
23+
24+
class TestSplitFuture(unittest.TestCase):
25+
def test_integration_return_tuple(self):
26+
with SingleNodeExecutor() as exe:
27+
cloudpickle_register(ind=1)
28+
future = exe.submit(function_returns_tuple, 15)
29+
f1, f2, f3 = split_future(future=future, n=3)
30+
self.assertEqual(f1.result(), "a")
31+
self.assertEqual(f2.result(), "b")
32+
self.assertEqual(f3.result(), 15)
33+
self.assertTrue(f1.done())
34+
self.assertTrue(f2.done())
35+
self.assertTrue(f3.done())
36+
37+
def test_integration_return_dict(self):
38+
with SingleNodeExecutor() as exe:
39+
cloudpickle_register(ind=1)
40+
future = exe.submit(function_returns_dict, 15)
41+
f1 = get_item_from_future(future=future, key="a")
42+
f2 = get_item_from_future(future=future, key="b")
43+
f3 = get_item_from_future(future=future, key="c")
44+
self.assertEqual(f1.result(), 1)
45+
self.assertEqual(f2.result(), 2)
46+
self.assertEqual(f3.result(), 15)
47+
self.assertTrue(f1.done())
48+
self.assertTrue(f2.done())
49+
self.assertTrue(f3.done())
50+
51+
def test_integration_exception(self):
52+
with SingleNodeExecutor() as exe:
53+
cloudpickle_register(ind=1)
54+
future = exe.submit(function_with_exception, 15)
55+
f1, f2, f3 = split_future(future=future, n=3)
56+
with self.assertRaises(RuntimeError):
57+
f3.result()
58+
59+
def test_split_future_object(self):
60+
f1 = Future()
61+
fs1 = FutureSelector(future=f1, selector=1)
62+
fs1.add_done_callback(callback)
63+
fs1.set_running_or_notify_cancel()
64+
self.assertTrue(fs1.running())
65+
fs1.set_result([1, 2])
66+
self.assertEqual(fs1.result(), 2)
67+
f2 = Future()
68+
fs2 = FutureSelector(future=f2, selector=1)
69+
fs2.cancel()
70+
self.assertTrue(fs2.cancelled())
71+
f3 = Future()
72+
fs3 = FutureSelector(future=f3, selector=1)
73+
fs3.set_running_or_notify_cancel()
74+
self.assertTrue(fs3.running())
75+
fs3.set_result(None)
76+
self.assertEqual(fs3.result(), None)
77+
f4 = Future()
78+
fs4 = FutureSelector(future=f4, selector=1)
79+
fs4.set_exception(RuntimeError())
80+
self.assertEqual(type(fs4.exception()), RuntimeError)
81+
with self.assertRaises(RuntimeError):
82+
fs4.result()

0 commit comments

Comments
 (0)