Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Enhance ddd workers #167

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flama/ddd/workers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from flama.ddd.workers.base import * # noqa
from flama.ddd.workers.http import * # noqa
from flama.ddd.workers.noop import * # noqa
from flama.ddd.workers.worker import * # noqa
40 changes: 40 additions & 0 deletions flama/ddd/workers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,43 @@ class BaseWorker(AbstractWorker, metaclass=WorkerType):
"""

_repositories: t.ClassVar[dict[str, type[BaseRepository]]]

@abc.abstractmethod
async def set_up(self) -> None:
"""First step in starting a unit of work."""
...

@abc.abstractmethod
async def tear_down(self, *, rollback: bool = False) -> None:
"""Last step in ending a unit of work.

:param rollback: If the unit of work should be rolled back.
"""
...

@abc.abstractmethod
async def repository_params(self) -> tuple[list[t.Any], dict[str, t.Any]]:
"""Get the parameters for initialising the repositories.

:return: Parameters for initialising the repositories.
"""
...

async def begin(self) -> None:
"""Start a unit of work."""
await self.set_up()

args, kwargs = await self.repository_params()

for repository, repository_class in self._repositories.items():
setattr(self, repository, repository_class(*args, **kwargs))

async def end(self, *, rollback: bool = False) -> None:
"""End a unit of work.

:param rollback: If the unit of work should be rolled back.
"""
await self.tear_down(rollback=rollback)

for repository in self._repositories.keys():
delattr(self, repository)
46 changes: 24 additions & 22 deletions flama/ddd/workers/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,39 +38,41 @@ def client(self) -> "Client":
except AttributeError:
raise AttributeError("Client not initialized")

async def begin_transaction(self) -> None:
"""Initialize the client with the URL."""
await self._client.__aenter__()
@client.setter
def client(self, client: "Client") -> None:
"""Set the client to interact with an HTTP resource.

async def end_transaction(self) -> None:
"""Close and delete the client."""
await self.client.__aexit__()
:param client: Flama client.
"""
self._client = client

async def begin(self) -> None:
"""Start a unit of work.
@client.deleter
def client(self) -> None:
"""Delete the client."""
del self._client

Initialize the client, and create the repositories.
"""
async def set_up(self) -> None:
"""Initialize the client with the URL."""
from flama.client import Client

self._client = Client(base_url=self.url, **self._client_kwargs)

await self.begin_transaction()
self.client = Client(base_url=self.url, **self._client_kwargs)

for repository, repository_class in self._repositories.items():
setattr(self, repository, repository_class(self._client))
await self.client.__aenter__()

async def end(self, *, rollback: bool = False) -> None:
"""End a unit of work.
async def tear_down(self, *, rollback: bool = False) -> None:
"""Close and delete the client.

Close the client, and delete the repositories.
:param rollback: If the unit of work should be rolled back.
"""
await self.end_transaction()
await self.client.__aexit__()
del self.client

for repository in self._repositories.keys():
delattr(self, repository)
async def repository_params(self) -> tuple[list[t.Any], dict[str, t.Any]]:
"""Get the parameters for initialising the repositories.

del self._client
:return: Parameters for initialising the repositories.
"""
return [self.client], {}

async def commit(self) -> None:
...
Expand Down
27 changes: 0 additions & 27 deletions flama/ddd/workers/noop.py

This file was deleted.

62 changes: 37 additions & 25 deletions flama/ddd/workers/sqlalchemy.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import typing as t

from flama import exceptions
from flama.ddd.workers.base import BaseWorker
Expand Down Expand Up @@ -37,6 +38,19 @@ def connection(self) -> AsyncConnection:
except AttributeError:
raise AttributeError("Connection not initialized")

@connection.setter
def connection(self, connection: AsyncConnection) -> None:
"""Set the connection to the database.

:param connection: Connection to the database.
"""
self._connection = connection

@connection.deleter
def connection(self) -> None:
"""Delete the connection to the database."""
del self._connection

@property
def transaction(self) -> AsyncTransaction:
"""Database transaction.
Expand All @@ -49,45 +63,43 @@ def transaction(self) -> AsyncTransaction:
except AttributeError:
raise AttributeError("Transaction not started")

async def begin_transaction(self) -> None:
@transaction.setter
def transaction(self, transaction: AsyncTransaction) -> None:
"""Set the transaction.

:param transaction: Database transaction.
"""
self._transaction = transaction

@transaction.deleter
def transaction(self) -> None:
"""Delete the transaction."""
del self._transaction

async def set_up(self) -> None:
"""Open a connection and begin a transaction."""

self._connection = await self.app.sqlalchemy.open_connection()
self._transaction = await self.app.sqlalchemy.begin_transaction(self._connection)
self.connection = await self.app.sqlalchemy.open_connection()
self.transaction = await self.app.sqlalchemy.begin_transaction(self._connection)

async def end_transaction(self, *, rollback: bool = False) -> None:
async def tear_down(self, *, rollback: bool = False) -> None:
"""End a transaction and close the connection.

:param rollback: If the transaction should be rolled back.
:raises AttributeError: If the connection is not initialized or the transaction is not started.
"""
await self.app.sqlalchemy.end_transaction(self.transaction, rollback=rollback)
del self._transaction
del self.transaction

await self.app.sqlalchemy.close_connection(self.connection)
del self._connection
del self.connection

async def begin(self) -> None:
"""Start a unit of work.
async def repository_params(self) -> tuple[list[t.Any], dict[str, t.Any]]:
"""Get the parameters for initialising the repositories.

Initialize the connection, begin a transaction, and create the repositories.
:return: Parameters for initialising the repositories.
"""
await self.begin_transaction()

for repository, repository_class in self._repositories.items():
setattr(self, repository, repository_class(self.connection))

async def end(self, *, rollback: bool = False) -> None:
"""End a unit of work.

Close the connection, commit or rollback the transaction, and delete the repositories.

:param rollback: If the unit of work should be rolled back.
"""
await self.end_transaction(rollback=rollback)

for repository in self._repositories.keys():
delattr(self, repository)
return [self.connection], {}

async def commit(self):
"""Commit the unit of work."""
Expand Down
38 changes: 38 additions & 0 deletions flama/ddd/workers/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import typing as t

from flama.ddd.workers.base import BaseWorker

__all__ = ["Worker"]


class Worker(BaseWorker):
"""Worker that does not apply any specific behavior.

A basic implementation of the worker class that does not apply any specific behavior.
"""

async def set_up(self) -> None:
"""First step in starting a unit of work."""
...

async def tear_down(self, *, rollback: bool = False) -> None:
"""Last step in ending a unit of work.

:param rollback: If the unit of work should be rolled back.
"""
...

async def repository_params(self) -> tuple[list[t.Any], dict[str, t.Any]]:
"""Get the parameters for initialising the repositories.

:return: Parameters for initialising the repositories.
"""
return [], {}

async def commit(self) -> None:
"""Commit the unit of work."""
...

async def rollback(self) -> None:
"""Rollback the unit of work."""
...
11 changes: 6 additions & 5 deletions flama/resources/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class Repositories:
registered: dict[str, type["SQLAlchemyTableRepository"]] = dataclasses.field(default_factory=dict)
initialised: t.Optional[dict[str, "SQLAlchemyTableRepository"]] = None

def init(self, connection: t.Any) -> None:
self.initialised = {r: cls(connection) for r, cls in self.registered.items()}
def init(self, *args: t.Any, **kwargs: t.Any) -> None:
self.initialised = {r: cls(*args, **kwargs) for r, cls in self.registered.items()}

def delete(self) -> None:
self.initialised = None
Expand Down Expand Up @@ -67,8 +67,9 @@ async def begin(self) -> None:

Initialize the connection, begin a transaction, and create the repositories.
"""
await self.begin_transaction()
self._resources_repositories.init(self.connection)
await self.set_up()
args, kwargs = await self.repository_params()
self._resources_repositories.init(*args, **kwargs)

async def end(self, *, rollback: bool = False) -> None:
"""End a unit of work.
Expand All @@ -77,5 +78,5 @@ async def end(self, *, rollback: bool = False) -> None:

:param rollback: If the unit of work should be rolled back.
"""
await self.end_transaction(rollback=rollback)
await self.tear_down(rollback=rollback)
self._resources_repositories.delete()
7 changes: 5 additions & 2 deletions tests/ddd/test_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ def worker(self, repository):
class FooWorker(BaseWorker):
foo: repository

async def begin(self):
async def set_up(self):
...

async def end(self, *, rollback: bool = False):
async def tear_down(self, *, rollback: bool = False):
...

async def repository_params(self):
return [], {}

async def commit(self):
...

Expand Down
9 changes: 9 additions & 0 deletions tests/ddd/workers/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ def worker(self, repository):
class FooWorker(BaseWorker):
foo: repository

async def set_up(self):
...

async def tear_down(self, *, rollback: bool = False):
...

async def repository_params(self):
return [], {}

async def begin(self):
...

Expand Down
Loading
Loading