Skip to content

Commit

Permalink
Add Event Loop pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
everysoftware committed Nov 29, 2024
1 parent 6922174 commit 3b03611
Show file tree
Hide file tree
Showing 16 changed files with 266 additions and 76 deletions.
98 changes: 41 additions & 57 deletions src/behavioral/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from __future__ import annotations

import io
from abc import ABC, abstractmethod
from sqlite3 import Connection
from types import TracebackType
Expand All @@ -20,22 +19,6 @@
from src.behavioral.iterator import Iterator


class Generator[YieldT, SendT, ReturnT](Iterator[YieldT], ABC):
@abstractmethod
def send(self, value: SendT) -> YieldT: ...

@abstractmethod
def throw(
self,
exc_type: type[BaseException],
exc_val: BaseException | None = None,
tb: TracebackType | None = None,
) -> Self: ...

@abstractmethod
def close(self) -> None: ...


# Basic usage


Expand Down Expand Up @@ -72,9 +55,50 @@ def gen_sum() -> PythonGenerator[int, int, int]:
return total


class CommitException(Exception):
pass


class AbortException(Exception):
pass


def db_session(
db: Connection, sql: str
) -> PythonGenerator[None, tuple[Any, ...], None]:
cursor = db.cursor()
try:
while True:
try:
row = yield
cursor.execute(sql, row)
except CommitException:
db.commit()
except AbortException:
db.rollback()
finally:
db.rollback()


# Class-based generator


class Generator[YieldT, SendT, ReturnT](Iterator[YieldT], ABC):
@abstractmethod
def send(self, value: SendT) -> YieldT: ...

@abstractmethod
def throw(
self,
exc_type: type[BaseException],
exc_val: BaseException | None = None,
tb: TracebackType | None = None,
) -> Self: ...

@abstractmethod
def close(self) -> None: ...


class SumGenerator(Generator[int, int, int]):
def __init__(self) -> None:
self._total = 0
Expand Down Expand Up @@ -102,43 +126,3 @@ def __iter__(self) -> SumGenerator:

def __next__(self) -> int:
return self.send(0)


def gen_line(
output: io.StringIO, state: dict[str, Any]
) -> PythonGenerator[str, None, None]:
# lines
try:
while True:
line = output.readline().rstrip()
if not line:
break
yield line
finally:
state["closed"] = True
output.close()


class CommitException(Exception):
pass


class AbortException(Exception):
pass


def db_session(
db: Connection, sql: str
) -> PythonGenerator[None, tuple[Any, ...], None]:
cursor = db.cursor()
try:
while True:
try:
row = yield
cursor.execute(sql, row)
except CommitException:
db.commit()
except AbortException:
db.rollback()
finally:
db.rollback()
File renamed without changes.
1 change: 1 addition & 0 deletions src/concurrency/examples/generators_sleep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# @coroutine decorator as asyncio.run
Empty file.
17 changes: 17 additions & 0 deletions src/concurrency/patterns/coroutine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from abc import abstractmethod, ABC
from typing import Any, Generator


GeneratorCoroutine = Generator


class Awaitable[T](ABC):
@abstractmethod
def __await__(self) -> Generator[Any, Any, T]: ...


class AwaitableCoroutine[YieldT, SendT, ReturnT](
Awaitable[ReturnT], Generator[YieldT, SendT, ReturnT], ABC
):
@abstractmethod
def __await__(self) -> Generator[YieldT, SendT, ReturnT]: ...
100 changes: 99 additions & 1 deletion src/concurrency/patterns/event_loop.py
Original file line number Diff line number Diff line change
@@ -1 +1,99 @@
# https://medium.com/@pekelny/fake-event-loop-python3-7498761af5e0
"""
Event Loop is a concurrency design pattern that is used to handle asynchronous events in a program. It is a loop that
listens for events and then triggers the appropriate event handlers. The Event Loop pattern is commonly used in GUI
applications, web servers, and other programs that need to handle multiple events simultaneously.
A Task is a subclass of Future that represents a coroutine that is running in the event loop. It is used to
manage the execution of the coroutine and handle its result.
"""

from __future__ import annotations

import logging
from queue import Queue
from typing import Any

from src.concurrency.patterns.coroutine import GeneratorCoroutine
from src.concurrency.patterns.future import Future

logger = logging.getLogger(__name__)


class Task[T](Future[T]):
def __init__(
self, gen: GeneratorCoroutine[Any, Any, T], loop: EventLoop
) -> None:
super().__init__()
self.gen: GeneratorCoroutine[Any, Any, T] = gen
self.loop: EventLoop = loop
loop.put(self)

def step(self, value: Any = None) -> None:
# Resume the coroutine
try:
yielded = self.gen.send(value)
# If the coroutine yielded a Future, add a callback to resume the coroutine when the Future is done
if isinstance(yielded, Future):
yielded.add_done_callback(lambda fut: self.step(fut.result()))
else:
self.loop.put(self)
# Coroutine has finished
except StopIteration as e:
self.set_result(e.value)
# Coroutine raised an exception
except Exception as e:
self.set_exception(e)


class EventLoop:
def __init__(self) -> None:
self.q: Queue[Task[Any]] = Queue()

def put(self, task: Task[Any]) -> None:
self.q.put(task)

def run_until_complete[T](
self, coro: GeneratorCoroutine[Any, Any, T]
) -> T:
task = create_task(coro)
while not task.done():
if not self.q.empty():
next_task = self.q.get()
next_task.step()
return task.result()

def close(self) -> None:
self.q.queue.clear()


running_loop: EventLoop | None = None


def new_event_loop() -> EventLoop:
return EventLoop()


def get_running_loop() -> EventLoop | None:
return running_loop


def get_event_loop() -> EventLoop:
loop = get_running_loop()
if loop is not None:
return loop
return new_event_loop()


def create_task[T](coro: GeneratorCoroutine[Any, Any, T]) -> Task[T]:
return Task(coro, get_event_loop())


def run[T](coro: GeneratorCoroutine[Any, Any, T]) -> T:
global running_loop
loop = get_event_loop()
running_loop = loop
try:
return loop.run_until_complete(coro)
finally:
running_loop = None
loop.close()
53 changes: 53 additions & 0 deletions src/concurrency/patterns/future.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
A Future is an object that represents the result of an asynchronous operation. It is used to store the result
of the operation and notify the caller when the operation is complete.
"""

from __future__ import annotations

from typing import Callable, Generator
from src.concurrency.patterns.coroutine import Awaitable


class Future[T](Awaitable[T]):
def __init__(self) -> None:
self._done: bool = False
self._result: T | None = None
self._exception: BaseException | None = None
self._callbacks: list[Callable[[Future[T]], None]] = []

def done(self) -> bool:
return self._done

def result(self) -> T:
if not self._done:
raise RuntimeError("Future is not done yet")
if self._exception:
raise self._exception
return self._result # type: ignore

def set_result(self, result: T) -> None:
self._result = result
self._done = True
self._schedule_callbacks()

def set_exception(self, exception: BaseException) -> None:
self._exception = exception
self._done = True
self._schedule_callbacks()

def add_done_callback(self, callback: Callable[[Future[T]], None]) -> None:
self._callbacks.append(callback)
if self._done:
self._schedule_callbacks()

def _schedule_callbacks(self) -> None:
for callback in self._callbacks:
callback(self)

def __await__(self) -> Generator[Future[T], None, T]:
if not self._done:
yield self
return self.result()

__iter__ = __await__
Empty file removed src/concurrency/patterns/join.py
Empty file.
Empty file removed src/concurrency/patterns/lock.py
Empty file.
Empty file.
Empty file removed src/concurrency/patterns/rwlock.py
Empty file.
Empty file.
Empty file.
Empty file.
18 changes: 0 additions & 18 deletions tests/test_behavioral.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import io
import sqlite3
from typing import Generator as PythonGenerator

Expand All @@ -13,7 +12,6 @@
count_delegation,
gen_sum,
SumGenerator,
gen_line,
db_session,
CommitException,
)
Expand Down Expand Up @@ -316,22 +314,6 @@ def test_gen_sum(impl: type[PythonGenerator[int, int, int]]) -> None:
assert e.value.value == 15


def test_gen_line() -> None:
# A file-like object
output = io.StringIO()
output.write("First line\n")
output.write("Second line\n")
output.write("Third line\n")
output.seek(0)
state = {"closed": False}

g = gen_line(output, state)
assert next(g) == "First line"
assert next(g) == "Second line"
assert next(g) == "Third line"
g.close()


def test_db_session() -> None:
db_url = ":memory:"
conn = sqlite3.connect(db_url)
Expand Down
55 changes: 55 additions & 0 deletions tests/test_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import time
from typing import Any

from src.concurrency.patterns.coroutine import (
GeneratorCoroutine,
)
from src.concurrency.patterns.event_loop import run, create_task


# I/O bound task
def sleep(seconds: int) -> GeneratorCoroutine[None, None, None]:
yield
start_time = time.time()
while time.time() - start_time < seconds:
yield


def get(_url: str) -> GeneratorCoroutine[None, None, str]:
yield from sleep(1)
return "OK"


def generate_text() -> GeneratorCoroutine[None, None, str]:
# health check
response = yield from get("https://api.openai.com")
if response != "OK":
return response
# generate text
response = yield from get(
"https://api.openai.com/generate?prompt=what is the meaning of life?"
)
return response


def test_generate_text() -> None:
assert run(generate_text()) == "OK"


def multiple_generate_text(
n: int,
) -> GeneratorCoroutine[Any, None, list[str]]:
yield
# Load tasks in event queue
tasks = [create_task(generate_text()) for _ in range(n)]
responses = []
# Await tasks
for task in tasks:
response = yield from task
responses.append(response)
return responses


def test_multiple_generate_text() -> None:
coro = multiple_generate_text(100)
assert run(coro) == ["OK"] * 100

0 comments on commit 3b03611

Please sign in to comment.