Skip to content

Commit 70ff06c

Browse files
committedAug 23, 2023
Feat. Added Effects, Pipes, and Forwards
1 parent e82333c commit 70ff06c

File tree

7 files changed

+188
-19
lines changed

7 files changed

+188
-19
lines changed
 

‎README.md

+99-6
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ Python User Language Support for [Spawn](https://github.com/eigr/spawn).
77
2. [Getting Started](#getting-started)
88
3. [Advanced Use Cases](#advanced-use-cases)
99
- [Types of Actors](#types-of-actors)
10-
- [Side Effects](#side-effects)
1110
- [Broadcast](#broadcast)
11+
- [Side Effects](#side-effects)
1212
- [Forward](#forward)
1313
- [Pipe](#pipe)
1414
4. [Using Actors](#using-actors)
@@ -163,9 +163,6 @@ First we need to understand how the various types of actors available in Spawn b
163163

164164
* **Pooled Actors**: Pooled Actors, as the name suggests, are a collection of actors that are grouped under the same name assigned to them at compile time. Pooled actors are generally used when higher performance is needed and are also recommended for handling serverless loads.
165165

166-
### Side Effects
167-
TODO
168-
169166
### Broadcast
170167

171168
Actors in Spawn can subscribe to a thread and receive, as well as broadcast, events for a given thread.
@@ -221,11 +218,107 @@ def set_language(request: Request, ctx: Context) -> Value:
221218
return Value().of(reply, ctx.state).reply()
222219
```
223220

221+
### Side Effects
222+
223+
Actors can also emit side effects to other Actors as part of their response.
224+
See an example:
225+
226+
```python
227+
from domain.domain_pb2 import State, Request, Reply
228+
from spawn.eigr.functions.actors.api.actor import Actor
229+
from spawn.eigr.functions.actors.api.settings import ActorSettings
230+
from spawn.eigr.functions.actors.api.context import Context
231+
from spawn.eigr.functions.actors.api.value import Value
232+
from spawn.eigr.functions.actors.api.workflows.effect import Effect
233+
234+
235+
actor = Actor(settings=ActorSettings(name="joe", stateful=True))
236+
237+
238+
@actor.timer_action(every=1000)
239+
def hi(ctx: Context) -> Value:
240+
new_state = None
241+
request = Request()
242+
request.language = "python"
243+
244+
effect: Effect = Effect(action="setLanguage", payload=request,
245+
system="spawn-system", actor="mike", parent="abs_actor")
246+
247+
if not ctx.state:
248+
new_state = State()
249+
new_state.languages.append("python")
250+
else:
251+
new_state = ctx.state
252+
253+
return Value()\
254+
.effect(effect)\
255+
.state(new_state)\
256+
.noreply()
257+
258+
```
259+
260+
Side effects such as broadcast are not part of the response flow to the caller. They are request-asynchronous events that are emitted after the Actor's state has been saved in memory.
261+
224262
### Forward
225-
TODO
263+
264+
Actors can route some actions to other actors as part of their response. For example, sometimes you may want another Actor to be responsible for processing a message that another Actor has received. We call this forwarding and it occurs when we want to forward the input argument of a request that a specific Actor has received to the input of an action in another Actor.
265+
266+
See an example:
267+
268+
```python
269+
from domain.domain_pb2 import Request
270+
271+
from spawn.eigr.functions.actors.api.actor import Actor
272+
from spawn.eigr.functions.actors.api.settings import ActorSettings
273+
from spawn.eigr.functions.actors.api.context import Context
274+
from spawn.eigr.functions.actors.api.value import Value
275+
from spawn.eigr.functions.actors.api.workflows.forward import Forward
276+
277+
actor = Actor(settings=ActorSettings(name="joe", stateful=True))
278+
279+
@actor.action("setLanguage")
280+
def set_language(request: Request, ctx: Context) -> Value:
281+
return Value()\
282+
.forward(Forward("mike", "setLanguage"))\
283+
.reply()
284+
```
226285

227286
### Pipe
228-
TODO
287+
288+
Similarly, sometimes we want to chain a request through several different processes. For example forwarding an actor's computational output as another actor's input. There is this type of routing we call Pipe, as the name suggests, a pipe forwards what would be the response of the received request to the input of another Action in another Actor.
289+
In the end, just like in a Forward, it is the response of the last Actor in the chain of routing to the original caller.
290+
291+
Example:
292+
293+
```python
294+
from domain.domain_pb2 import State, Request, Reply
295+
296+
from spawn.eigr.functions.actors.api.actor import Actor
297+
from spawn.eigr.functions.actors.api.settings import ActorSettings
298+
from spawn.eigr.functions.actors.api.context import Context
299+
from spawn.eigr.functions.actors.api.value import Value
300+
from spawn.eigr.functions.actors.api.workflows.pipe import Pipe
301+
302+
actor = Actor(settings=ActorSettings(name="joe", stateful=True))
303+
304+
@actor.action("setLanguage")
305+
def set_language(request: Request, ctx: Context) -> Value:
306+
reply = Reply()
307+
reply.language = "python"
308+
309+
if not ctx.state:
310+
new_state = State()
311+
new_state.languages.append("python")
312+
else:
313+
new_state = ctx.state
314+
315+
return Value()\
316+
.response(reply)\
317+
.pipe(Pipe("mike", "setLanguage"))\
318+
.reply()
319+
```
320+
321+
Forwards and pipes do not have an upper thread limit other than the request timeout.
229322

230323
## Using Actors
231324

‎example/joe.py

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from spawn.eigr.functions.actors.api.value import Value
1010
from spawn.eigr.functions.actors.api.workflows.broadcast import Broadcast
1111

12+
1213
actor = Actor(settings=ActorSettings(
1314
name="joe", stateful=True, channel="test"))
1415

@@ -18,6 +19,7 @@ def hi(ctx: Context) -> Value:
1819
new_state = None
1920
request = Request()
2021
request.language = "python"
22+
2123
broadcast = Broadcast()
2224
broadcast.channel = "test"
2325
broadcast.action_name = "setLanguage"

‎spawn/eigr/functions/actors/api/value.py

+14-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
# pylint: disable=too-few-public-methods
22
# pylint: disable=arguments-differ
3-
from dataclasses import dataclass
3+
from dataclasses import dataclass, field
44
from enum import Enum
5+
from typing import List
56

67
from spawn.eigr.functions.actors.api.metadata import Metadata
78
from spawn.eigr.functions.actors.api.workflows.broadcast import Broadcast
@@ -22,7 +23,7 @@ class Value():
2223
__response = None
2324
__metadata: Metadata = None
2425
__broadcast: Broadcast = None
25-
__effect: Effect = None
26+
__effects: List[Effect] = field(default_factory=list)
2627
__forward: Forward = None
2728
__pipe: Pipe = None
2829
__reply_kind: ReplyKind = ReplyKind.REPLY
@@ -42,8 +43,8 @@ def get_broadcast(self):
4243
def get_broadcast(self):
4344
return self.__broadcast
4445

45-
def get_effect(self):
46-
return self.__effect
46+
def get_effects(self):
47+
return self.__effects
4748

4849
def get_forward(self):
4950
return self.__forward
@@ -76,14 +77,22 @@ def broadcast(self, broadcast: Broadcast):
7677
return self
7778

7879
def effect(self, effect: Effect):
79-
self.__effect = effect
80+
self.__effects.append(effect)
8081
return self
8182

8283
def forward(self, forward: Forward):
84+
if self.__pipe:
85+
raise Exception(
86+
"You can only use either Pipe or Forward never both")
87+
8388
self.__forward = forward
8489
return self
8590

8691
def pipe(self, pipe: Pipe):
92+
if self.__forward:
93+
raise Exception(
94+
"You can only use either Forward or Pipe never both")
95+
8796
self.__pipe = pipe
8897
return self
8998

Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11

22
class Effect:
3-
def __init__(self):
4-
self
3+
def __init__(self, action: str, system: str, actor: str, parent: str, payload: any = None):
4+
self.action = action
5+
self.actor = actor
6+
self.system = system
7+
self.parent = parent
8+
self.payload = payload
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11

22
class Forward:
3-
def __init__(self):
4-
pass
3+
def __init__(self, actor: str, action: str):
4+
self.actor = actor
5+
self.action = action
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11

22
class Pipe:
3-
def __init__(self):
4-
pass
3+
def __init__(self, actor: str, action: str):
4+
self.actor = actor
5+
self.action = action

‎spawn/eigr/functions/actors/internal/controller.py

+61-2
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88

99
from spawn.eigr.functions.actors.api.actor import Actor as ActorEntity
1010
from spawn.eigr.functions.actors.api.actor import ActorHandler
11+
1112
from spawn.eigr.functions.actors.api.context import Context as ActorContext
1213
from spawn.eigr.functions.actors.api.value import Value, ReplyKind
1314
from spawn.eigr.functions.actors.api.settings import Kind as ActorKind
1415
from spawn.eigr.functions.actors.internal.client import SpawnClient
16+
from spawn.eigr.functions.actors.api.workflows.effect import Effect
1517

1618
from spawn.eigr.functions.protocol.actors.actor_pb2 import (
1719
Actor,
@@ -32,13 +34,18 @@
3234
from spawn.eigr.functions.protocol.actors.protocol_pb2 import (
3335
ActorInvocation,
3436
ActorInvocationResponse,
37+
Broadcast,
38+
Forward,
3539
Context,
40+
InvocationRequest,
3641
Noop,
42+
Pipe,
3743
RegistrationRequest,
38-
ServiceInfo,
39-
Broadcast
44+
ServiceInfo
4045
)
4146

47+
from spawn.eigr.functions.protocol.actors.protocol_pb2 import SideEffect as SpawnEffect
48+
4249
from google.protobuf import symbol_database as _symbol_database
4350
from google.protobuf.any_pb2 import Any as AnyProto
4451

@@ -100,9 +107,61 @@ def handle_response(system, actor_name, result):
100107
broadcast = handle_broadcast(value_broadcast)
101108
actor_invocation_response.workflow.broadcast.CopyFrom(broadcast)
102109

110+
if result.get_effects():
111+
effects = list(map(lambda ef: to_effect(ef), [
112+
ef for ef in result.get_effects()]))
113+
114+
actor_invocation_response.workflow.effects.extend(effects)
115+
116+
if result.get_forward() != None:
117+
f = result.get_forward()
118+
forward = Forward()
119+
forward.actor = f.actor
120+
forward.action_name = f.action
121+
actor_invocation_response.workflow.forward.CopyFrom(forward)
122+
123+
if result.get_pipe() != None:
124+
p = result.get_pipe()
125+
pipe = Pipe()
126+
pipe.actor = p.actor
127+
pipe.action_name = p.action
128+
actor_invocation_response.workflow.pipe.CopyFrom(pipe)
129+
103130
return actor_invocation_response
104131

105132

133+
def to_effect(ef: Effect):
134+
# We initialize a ref actor here to ensure the target actor exists.
135+
# This costs one more sidecar call but avoids actor not found errors
136+
from spawn.eigr.functions.actors.api.reference import ActorRef
137+
ref = ActorRef(SpawnClient(), ef.system, ef.actor, ef.parent)
138+
139+
req: InvocationRequest = InvocationRequest()
140+
system = ActorSystem()
141+
system.name = ref.actor_system
142+
143+
actor_id = ActorId()
144+
actor_id.name = ref.actor_name
145+
actor_id.system = ref.actor_system
146+
147+
actor = Actor()
148+
actor.id.CopyFrom(actor_id)
149+
150+
req.system.CopyFrom(system)
151+
req.actor.CopyFrom(actor)
152+
req.action_name = ef.action
153+
req.pooled = False
154+
setattr(req, 'async', True)
155+
156+
if ef.payload != None:
157+
req.value.CopyFrom(pack(ef.payload))
158+
159+
side_effect: SpawnEffect = SpawnEffect()
160+
side_effect.request.CopyFrom(req)
161+
162+
return side_effect
163+
164+
106165
def handle_broadcast(value_broadcast):
107166
broadcast = Broadcast()
108167
broadcast.channel_group = value_broadcast.channel

0 commit comments

Comments
 (0)
Please sign in to comment.