-
Notifications
You must be signed in to change notification settings - Fork 115
/
Copy pathconcurrency.py
46 lines (34 loc) · 1.25 KB
/
concurrency.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
"""
Some of our tests require branching of flow control.
We'd like to have the same kind of test for both async and sync environments,
and so we have functionality here that replicate's Trio's `open_nursery` API,
but in a plain old multi-threaded context.
We don't do any smarts around cancellations, or managing exceptions from
childen, because we don't need that for our use-case.
"""
import threading
import time
from types import TracebackType
from typing import Any, Callable, List, Optional, Type
class Nursery:
def __init__(self) -> None:
self._threads: List[threading.Thread] = []
def __enter__(self) -> "Nursery":
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]] = None,
exc_value: Optional[BaseException] = None,
traceback: Optional[TracebackType] = None,
) -> None:
for thread in self._threads:
thread.start()
for thread in self._threads:
thread.join()
def start_soon(self, func: Callable[..., object], *args: Any) -> None:
thread = threading.Thread(target=func, args=args)
self._threads.append(thread)
def open_nursery() -> Nursery:
return Nursery()
def sleep(seconds: float) -> None:
time.sleep(seconds)