diff --git a/src/behavioral/generator.py b/src/behavioral/generator.py index f7f97f2..58e9c50 100644 --- a/src/behavioral/generator.py +++ b/src/behavioral/generator.py @@ -11,7 +11,6 @@ from __future__ import annotations -import io from abc import ABC, abstractmethod from sqlite3 import Connection from types import TracebackType @@ -20,22 +19,6 @@ from src.behavioral.iterator import Iterator -class Generator[YieldT, SendT, ReturnT](Iterator[YieldT], ABC): - @abstractmethod - def send(self, value: SendT) -> YieldT: ... - - @abstractmethod - def throw( - self, - exc_type: type[BaseException], - exc_val: BaseException | None = None, - tb: TracebackType | None = None, - ) -> Self: ... - - @abstractmethod - def close(self) -> None: ... - - # Basic usage @@ -72,9 +55,50 @@ def gen_sum() -> PythonGenerator[int, int, int]: return total +class CommitException(Exception): + pass + + +class AbortException(Exception): + pass + + +def db_session( + db: Connection, sql: str +) -> PythonGenerator[None, tuple[Any, ...], None]: + cursor = db.cursor() + try: + while True: + try: + row = yield + cursor.execute(sql, row) + except CommitException: + db.commit() + except AbortException: + db.rollback() + finally: + db.rollback() + + # Class-based generator +class Generator[YieldT, SendT, ReturnT](Iterator[YieldT], ABC): + @abstractmethod + def send(self, value: SendT) -> YieldT: ... + + @abstractmethod + def throw( + self, + exc_type: type[BaseException], + exc_val: BaseException | None = None, + tb: TracebackType | None = None, + ) -> Self: ... + + @abstractmethod + def close(self) -> None: ... + + class SumGenerator(Generator[int, int, int]): def __init__(self) -> None: self._total = 0 @@ -102,43 +126,3 @@ def __iter__(self) -> SumGenerator: def __next__(self) -> int: return self.send(0) - - -def gen_line( - output: io.StringIO, state: dict[str, Any] -) -> PythonGenerator[str, None, None]: - # lines - try: - while True: - line = output.readline().rstrip() - if not line: - break - yield line - finally: - state["closed"] = True - output.close() - - -class CommitException(Exception): - pass - - -class AbortException(Exception): - pass - - -def db_session( - db: Connection, sql: str -) -> PythonGenerator[None, tuple[Any, ...], None]: - cursor = db.cursor() - try: - while True: - try: - row = yield - cursor.execute(sql, row) - except CommitException: - db.commit() - except AbortException: - db.rollback() - finally: - db.rollback() diff --git a/src/concurrency/patterns/active_object.py b/src/concurrency/examples/coroutines_sleep.py similarity index 100% rename from src/concurrency/patterns/active_object.py rename to src/concurrency/examples/coroutines_sleep.py diff --git a/src/concurrency/examples/generators_sleep.py b/src/concurrency/examples/generators_sleep.py new file mode 100644 index 0000000..3150ff0 --- /dev/null +++ b/src/concurrency/examples/generators_sleep.py @@ -0,0 +1 @@ +# @coroutine decorator as asyncio.run diff --git a/src/concurrency/patterns/asyncio.py b/src/concurrency/patterns/asyncio.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/concurrency/patterns/coroutine.py b/src/concurrency/patterns/coroutine.py new file mode 100644 index 0000000..0336ed5 --- /dev/null +++ b/src/concurrency/patterns/coroutine.py @@ -0,0 +1,17 @@ +from abc import abstractmethod, ABC +from typing import Any, Generator + + +GeneratorCoroutine = Generator + + +class Awaitable[T](ABC): + @abstractmethod + def __await__(self) -> Generator[Any, Any, T]: ... + + +class AwaitableCoroutine[YieldT, SendT, ReturnT]( + Awaitable[ReturnT], Generator[YieldT, SendT, ReturnT], ABC +): + @abstractmethod + def __await__(self) -> Generator[YieldT, SendT, ReturnT]: ... diff --git a/src/concurrency/patterns/event_loop.py b/src/concurrency/patterns/event_loop.py index f41a207..05b3ad2 100644 --- a/src/concurrency/patterns/event_loop.py +++ b/src/concurrency/patterns/event_loop.py @@ -1 +1,99 @@ -# https://medium.com/@pekelny/fake-event-loop-python3-7498761af5e0 +""" +Event Loop is a concurrency design pattern that is used to handle asynchronous events in a program. It is a loop that +listens for events and then triggers the appropriate event handlers. The Event Loop pattern is commonly used in GUI +applications, web servers, and other programs that need to handle multiple events simultaneously. + +A Task is a subclass of Future that represents a coroutine that is running in the event loop. It is used to +manage the execution of the coroutine and handle its result. +""" + +from __future__ import annotations + +import logging +from queue import Queue +from typing import Any + +from src.concurrency.patterns.coroutine import GeneratorCoroutine +from src.concurrency.patterns.future import Future + +logger = logging.getLogger(__name__) + + +class Task[T](Future[T]): + def __init__( + self, gen: GeneratorCoroutine[Any, Any, T], loop: EventLoop + ) -> None: + super().__init__() + self.gen: GeneratorCoroutine[Any, Any, T] = gen + self.loop: EventLoop = loop + loop.put(self) + + def step(self, value: Any = None) -> None: + # Resume the coroutine + try: + yielded = self.gen.send(value) + # If the coroutine yielded a Future, add a callback to resume the coroutine when the Future is done + if isinstance(yielded, Future): + yielded.add_done_callback(lambda fut: self.step(fut.result())) + else: + self.loop.put(self) + # Coroutine has finished + except StopIteration as e: + self.set_result(e.value) + # Coroutine raised an exception + except Exception as e: + self.set_exception(e) + + +class EventLoop: + def __init__(self) -> None: + self.q: Queue[Task[Any]] = Queue() + + def put(self, task: Task[Any]) -> None: + self.q.put(task) + + def run_until_complete[T]( + self, coro: GeneratorCoroutine[Any, Any, T] + ) -> T: + task = create_task(coro) + while not task.done(): + if not self.q.empty(): + next_task = self.q.get() + next_task.step() + return task.result() + + def close(self) -> None: + self.q.queue.clear() + + +running_loop: EventLoop | None = None + + +def new_event_loop() -> EventLoop: + return EventLoop() + + +def get_running_loop() -> EventLoop | None: + return running_loop + + +def get_event_loop() -> EventLoop: + loop = get_running_loop() + if loop is not None: + return loop + return new_event_loop() + + +def create_task[T](coro: GeneratorCoroutine[Any, Any, T]) -> Task[T]: + return Task(coro, get_event_loop()) + + +def run[T](coro: GeneratorCoroutine[Any, Any, T]) -> T: + global running_loop + loop = get_event_loop() + running_loop = loop + try: + return loop.run_until_complete(coro) + finally: + running_loop = None + loop.close() diff --git a/src/concurrency/patterns/future.py b/src/concurrency/patterns/future.py new file mode 100644 index 0000000..badf7e9 --- /dev/null +++ b/src/concurrency/patterns/future.py @@ -0,0 +1,53 @@ +""" +A Future is an object that represents the result of an asynchronous operation. It is used to store the result + of the operation and notify the caller when the operation is complete. +""" + +from __future__ import annotations + +from typing import Callable, Generator +from src.concurrency.patterns.coroutine import Awaitable + + +class Future[T](Awaitable[T]): + def __init__(self) -> None: + self._done: bool = False + self._result: T | None = None + self._exception: BaseException | None = None + self._callbacks: list[Callable[[Future[T]], None]] = [] + + def done(self) -> bool: + return self._done + + def result(self) -> T: + if not self._done: + raise RuntimeError("Future is not done yet") + if self._exception: + raise self._exception + return self._result # type: ignore + + def set_result(self, result: T) -> None: + self._result = result + self._done = True + self._schedule_callbacks() + + def set_exception(self, exception: BaseException) -> None: + self._exception = exception + self._done = True + self._schedule_callbacks() + + def add_done_callback(self, callback: Callable[[Future[T]], None]) -> None: + self._callbacks.append(callback) + if self._done: + self._schedule_callbacks() + + def _schedule_callbacks(self) -> None: + for callback in self._callbacks: + callback(self) + + def __await__(self) -> Generator[Future[T], None, T]: + if not self._done: + yield self + return self.result() + + __iter__ = __await__ diff --git a/src/concurrency/patterns/join.py b/src/concurrency/patterns/join.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/concurrency/patterns/lock.py b/src/concurrency/patterns/lock.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/concurrency/patterns/process_pool.py b/src/concurrency/patterns/process_pool.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/concurrency/patterns/rwlock.py b/src/concurrency/patterns/rwlock.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/concurrency/patterns/scheduler.py b/src/concurrency/patterns/scheduler.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/concurrency/patterns/semaphore.py b/src/concurrency/patterns/semaphore.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/concurrency/patterns/thread_pool.py b/src/concurrency/patterns/thread_pool.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/test_behavioral.py b/tests/test_behavioral.py index c58c44b..6d37931 100644 --- a/tests/test_behavioral.py +++ b/tests/test_behavioral.py @@ -1,4 +1,3 @@ -import io import sqlite3 from typing import Generator as PythonGenerator @@ -13,7 +12,6 @@ count_delegation, gen_sum, SumGenerator, - gen_line, db_session, CommitException, ) @@ -316,22 +314,6 @@ def test_gen_sum(impl: type[PythonGenerator[int, int, int]]) -> None: assert e.value.value == 15 -def test_gen_line() -> None: - # A file-like object - output = io.StringIO() - output.write("First line\n") - output.write("Second line\n") - output.write("Third line\n") - output.seek(0) - state = {"closed": False} - - g = gen_line(output, state) - assert next(g) == "First line" - assert next(g) == "Second line" - assert next(g) == "Third line" - g.close() - - def test_db_session() -> None: db_url = ":memory:" conn = sqlite3.connect(db_url) diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py new file mode 100644 index 0000000..25b621f --- /dev/null +++ b/tests/test_concurrency.py @@ -0,0 +1,55 @@ +import time +from typing import Any + +from src.concurrency.patterns.coroutine import ( + GeneratorCoroutine, +) +from src.concurrency.patterns.event_loop import run, create_task + + +# I/O bound task +def sleep(seconds: int) -> GeneratorCoroutine[None, None, None]: + yield + start_time = time.time() + while time.time() - start_time < seconds: + yield + + +def get(_url: str) -> GeneratorCoroutine[None, None, str]: + yield from sleep(1) + return "OK" + + +def generate_text() -> GeneratorCoroutine[None, None, str]: + # health check + response = yield from get("https://api.openai.com") + if response != "OK": + return response + # generate text + response = yield from get( + "https://api.openai.com/generate?prompt=what is the meaning of life?" + ) + return response + + +def test_generate_text() -> None: + assert run(generate_text()) == "OK" + + +def multiple_generate_text( + n: int, +) -> GeneratorCoroutine[Any, None, list[str]]: + yield + # Load tasks in event queue + tasks = [create_task(generate_text()) for _ in range(n)] + responses = [] + # Await tasks + for task in tasks: + response = yield from task + responses.append(response) + return responses + + +def test_multiple_generate_text() -> None: + coro = multiple_generate_text(100) + assert run(coro) == ["OK"] * 100