Skip to content

Redesign managing actor #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions src/frequenz/dispatch/_managing_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dataclasses import dataclass
from typing import Any, Set

from frequenz.channels import Receiver, Sender
from frequenz.channels import Broadcast, Receiver
from frequenz.client.dispatch.types import TargetComponents
from frequenz.sdk.actor import Actor

Expand Down Expand Up @@ -85,9 +85,6 @@ async def run():
key=key
)

# Create update channel to receive dispatch update events pre-start and mid-run
dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel")

# Start actor and give it an dispatch updates channel receiver
my_actor = MyActor(dispatch_updates_channel.new_receiver())

Expand All @@ -106,26 +103,45 @@ async def run():

def __init__(
self,
actor: Actor | Set[Actor],
dispatch_type: str,
running_status_receiver: Receiver[Dispatch],
updates_sender: Sender[DispatchUpdate] | None = None,
actor: Actor | Set[Actor] | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going for an actor factory. Will submit my WIP as a draft PR so maybe you can get some inspiration about it, or we can discuss pros and cons from the different approaches.

) -> None:
"""Initialize the dispatch handler.

Args:
actor: A set of actors or a single actor to manage.
dispatch_type: The type of dispatches to handle.
running_status_receiver: The receiver for dispatch running status changes.
updates_sender: The sender for dispatch events
actor: Optional, but should be set later in set_actor(). A set of
actors or a single actor to manage.
"""
super().__init__()
self._dispatch_rx = running_status_receiver
self._actors: frozenset[Actor] = frozenset(
[actor] if isinstance(actor, Actor) else actor
self._actors: frozenset[Actor] = (
frozenset()
if actor is None
else frozenset([actor] if isinstance(actor, Actor) else actor)
)
self._dispatch_type = dispatch_type
self._updates_sender = updates_sender
self._updates_channel = Broadcast[DispatchUpdate](
name="dispatch_updates_channel", resend_latest=True
)
self._updates_sender = self._updates_channel.new_sender()

def set_actor(self, actor: Actor | Set[Actor]) -> None:
"""Set the actor to manage.

Args:
actor: A set of actors or a single actor to manage.
"""
self._actors = (
frozenset([actor]) if isinstance(actor, Actor) else frozenset(actor)
)

def new_receiver(self) -> Receiver[DispatchUpdate]:
"""Create a new receiver for dispatch updates.

Returns:
A new receiver for dispatch updates.
"""
return self._updates_channel.new_receiver()
Comment on lines +138 to +144
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes! I also thought about this, but I didn't start to address this but was my next step. I think this actor should also stop being an actor and just be a background service.

I'm not sure it is needed with my approach, but I think it would be convenient anyway, I see no advantages in delegating the creation of the channel to the outside.


def _start_actors(self) -> None:
"""Start all actors."""
Expand Down Expand Up @@ -158,10 +174,6 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None:
Args:
dispatch: The dispatch to handle.
"""
if dispatch.type != self._dispatch_type:
_logger.debug("Ignoring dispatch %s", dispatch.id)
return

if dispatch.started:
if self._updates_sender is not None:
_logger.info("Updated by dispatch %s", dispatch.id)
Expand Down
Loading