Skip to content

Commit

Permalink
🐛 Enhance ddd workers
Browse files Browse the repository at this point in the history
  • Loading branch information
perdy committed Dec 17, 2024
1 parent cc36684 commit 2bed497
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 113 deletions.
2 changes: 1 addition & 1 deletion flama/ddd/workers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from flama.ddd.workers.base import * # noqa
from flama.ddd.workers.http import * # noqa
from flama.ddd.workers.noop import * # noqa
from flama.ddd.workers.worker import * # noqa
40 changes: 40 additions & 0 deletions flama/ddd/workers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,43 @@ class BaseWorker(AbstractWorker, metaclass=WorkerType):
"""

_repositories: t.ClassVar[dict[str, type[BaseRepository]]]

@abc.abstractmethod
async def set_up(self) -> None:
"""First step in starting a unit of work."""
...

@abc.abstractmethod
async def tear_down(self, *, rollback: bool = False) -> None:
"""Last step in ending a unit of work.
:param rollback: If the unit of work should be rolled back.
"""
...

@abc.abstractmethod
async def repository_params(self) -> tuple[list[t.Any], dict[str, t.Any]]:
"""Get the parameters for initialising the repositories.
:return: Parameters for initialising the repositories.
"""
...

async def begin(self) -> None:
"""Start a unit of work."""
await self.set_up()

args, kwargs = await self.repository_params()

for repository, repository_class in self._repositories.items():
setattr(self, repository, repository_class(*args, **kwargs))

async def end(self, *, rollback: bool = False) -> None:
"""End a unit of work.
:param rollback: If the unit of work should be rolled back.
"""
await self.tear_down(rollback=rollback)

for repository in self._repositories.keys():
delattr(self, repository)
46 changes: 24 additions & 22 deletions flama/ddd/workers/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,39 +38,41 @@ def client(self) -> "Client":
except AttributeError:
raise AttributeError("Client not initialized")

async def begin_transaction(self) -> None:
"""Initialize the client with the URL."""
await self._client.__aenter__()
@client.setter
def client(self, client: "Client") -> None:
"""Set the client to interact with an HTTP resource.
async def end_transaction(self) -> None:
"""Close and delete the client."""
await self.client.__aexit__()
:param client: Flama client.
"""
self._client = client

async def begin(self) -> None:
"""Start a unit of work.
@client.deleter
def client(self) -> None:
"""Delete the client."""
del self._client

Initialize the client, and create the repositories.
"""
async def set_up(self) -> None:
"""Initialize the client with the URL."""
from flama.client import Client

self._client = Client(base_url=self.url, **self._client_kwargs)

await self.begin_transaction()
self.client = Client(base_url=self.url, **self._client_kwargs)

for repository, repository_class in self._repositories.items():
setattr(self, repository, repository_class(self._client))
await self.client.__aenter__()

async def end(self, *, rollback: bool = False) -> None:
"""End a unit of work.
async def tear_down(self, *, rollback: bool = False) -> None:
"""Close and delete the client.
Close the client, and delete the repositories.
:param rollback: If the unit of work should be rolled back.
"""
await self.end_transaction()
await self.client.__aexit__()
del self.client

for repository in self._repositories.keys():
delattr(self, repository)
async def repository_params(self) -> tuple[list[t.Any], dict[str, t.Any]]:
"""Get the parameters for initialising the repositories.
del self._client
:return: Parameters for initialising the repositories.
"""
return [self.client], {}

async def commit(self) -> None:
...
Expand Down
27 changes: 0 additions & 27 deletions flama/ddd/workers/noop.py

This file was deleted.

62 changes: 37 additions & 25 deletions flama/ddd/workers/sqlalchemy.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import typing as t

from flama import exceptions
from flama.ddd.workers.base import BaseWorker
Expand Down Expand Up @@ -37,6 +38,19 @@ def connection(self) -> AsyncConnection:
except AttributeError:
raise AttributeError("Connection not initialized")

@connection.setter
def connection(self, connection: AsyncConnection) -> None:
"""Set the connection to the database.
:param connection: Connection to the database.
"""
self._connection = connection

@connection.deleter
def connection(self) -> None:
"""Delete the connection to the database."""
del self._connection

@property
def transaction(self) -> AsyncTransaction:
"""Database transaction.
Expand All @@ -49,45 +63,43 @@ def transaction(self) -> AsyncTransaction:
except AttributeError:
raise AttributeError("Transaction not started")

async def begin_transaction(self) -> None:
@transaction.setter
def transaction(self, transaction: AsyncTransaction) -> None:
"""Set the transaction.
:param transaction: Database transaction.
"""
self._transaction = transaction

@transaction.deleter
def transaction(self) -> None:
"""Delete the transaction."""
del self._transaction

async def set_up(self) -> None:
"""Open a connection and begin a transaction."""

self._connection = await self.app.sqlalchemy.open_connection()
self._transaction = await self.app.sqlalchemy.begin_transaction(self._connection)
self.connection = await self.app.sqlalchemy.open_connection()
self.transaction = await self.app.sqlalchemy.begin_transaction(self._connection)

async def end_transaction(self, *, rollback: bool = False) -> None:
async def tear_down(self, *, rollback: bool = False) -> None:
"""End a transaction and close the connection.
:param rollback: If the transaction should be rolled back.
:raises AttributeError: If the connection is not initialized or the transaction is not started.
"""
await self.app.sqlalchemy.end_transaction(self.transaction, rollback=rollback)
del self._transaction
del self.transaction

await self.app.sqlalchemy.close_connection(self.connection)
del self._connection
del self.connection

async def begin(self) -> None:
"""Start a unit of work.
async def repository_params(self) -> tuple[list[t.Any], dict[str, t.Any]]:
"""Get the parameters for initialising the repositories.
Initialize the connection, begin a transaction, and create the repositories.
:return: Parameters for initialising the repositories.
"""
await self.begin_transaction()

for repository, repository_class in self._repositories.items():
setattr(self, repository, repository_class(self.connection))

async def end(self, *, rollback: bool = False) -> None:
"""End a unit of work.
Close the connection, commit or rollback the transaction, and delete the repositories.
:param rollback: If the unit of work should be rolled back.
"""
await self.end_transaction(rollback=rollback)

for repository in self._repositories.keys():
delattr(self, repository)
return [self.connection], {}

async def commit(self):
"""Commit the unit of work."""
Expand Down
38 changes: 38 additions & 0 deletions flama/ddd/workers/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import typing as t

from flama.ddd.workers.base import BaseWorker

__all__ = ["Worker"]


class Worker(BaseWorker):
"""Worker that does not apply any specific behavior.
A basic implementation of the worker class that does not apply any specific behavior.
"""

async def set_up(self) -> None:
"""First step in starting a unit of work."""
...

async def tear_down(self, *, rollback: bool = False) -> None:
"""Last step in ending a unit of work.
:param rollback: If the unit of work should be rolled back.
"""
...

async def repository_params(self) -> tuple[list[t.Any], dict[str, t.Any]]:
"""Get the parameters for initialising the repositories.
:return: Parameters for initialising the repositories.
"""
return [], {}

async def commit(self) -> None:
"""Commit the unit of work."""
...

async def rollback(self) -> None:
"""Rollback the unit of work."""
...
11 changes: 6 additions & 5 deletions flama/resources/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class Repositories:
registered: dict[str, type["SQLAlchemyTableRepository"]] = dataclasses.field(default_factory=dict)
initialised: t.Optional[dict[str, "SQLAlchemyTableRepository"]] = None

def init(self, connection: t.Any) -> None:
self.initialised = {r: cls(connection) for r, cls in self.registered.items()}
def init(self, *args: t.Any, **kwargs: t.Any) -> None:
self.initialised = {r: cls(*args, **kwargs) for r, cls in self.registered.items()}

def delete(self) -> None:
self.initialised = None
Expand Down Expand Up @@ -67,8 +67,9 @@ async def begin(self) -> None:
Initialize the connection, begin a transaction, and create the repositories.
"""
await self.begin_transaction()
self._resources_repositories.init(self.connection)
await self.set_up()
args, kwargs = await self.repository_params()
self._resources_repositories.init(*args, **kwargs)

async def end(self, *, rollback: bool = False) -> None:
"""End a unit of work.
Expand All @@ -77,5 +78,5 @@ async def end(self, *, rollback: bool = False) -> None:
:param rollback: If the unit of work should be rolled back.
"""
await self.end_transaction(rollback=rollback)
await self.tear_down(rollback=rollback)
self._resources_repositories.delete()
7 changes: 5 additions & 2 deletions tests/ddd/test_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ def worker(self, repository):
class FooWorker(BaseWorker):
foo: repository

async def begin(self):
async def set_up(self):
...

async def end(self, *, rollback: bool = False):
async def tear_down(self, *, rollback: bool = False):
...

async def repository_params(self):
return [], {}

async def commit(self):
...

Expand Down
9 changes: 9 additions & 0 deletions tests/ddd/workers/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ def worker(self, repository):
class FooWorker(BaseWorker):
foo: repository

async def set_up(self):
...

async def tear_down(self, *, rollback: bool = False):
...

async def repository_params(self):
return [], {}

async def begin(self):
...

Expand Down
Loading

0 comments on commit 2bed497

Please sign in to comment.