Skip to content

Commit 42b3e45

Browse files
Run retry e2e tests (#22)
* Make sure we fill the identity key. Fix #16 * Update to test suite 2.0 * Make dockerfile more cacheable
1 parent 05dbb3c commit 42b3e45

File tree

11 files changed

+117
-43
lines changed

11 files changed

+117
-43
lines changed

.dockerignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,5 @@
1414

1515
**/test_report/
1616

17+
target
18+

.github/workflows/integration.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ jobs:
4141
name: "Features integration test (sdk-test-suite version ${{ matrix.sdk-test-suite }})"
4242
strategy:
4343
matrix:
44-
sdk-test-suite: [ "1.5" ]
44+
sdk-test-suite: [ "2.0" ]
4545
permissions:
4646
contents: read
4747
issues: read

test-services/Dockerfile

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,14 @@ FROM ghcr.io/pyo3/maturin AS build-sdk
44

55
WORKDIR /usr/src/app
66

7-
COPY . .
7+
COPY src ./src/
8+
COPY python ./python/
9+
COPY Cargo.lock .
10+
COPY Cargo.toml .
11+
COPY rust-toolchain.toml .
12+
COPY requirements.txt .
13+
COPY pyproject.toml .
14+
COPY LICENSE .
815

916
RUN maturin build --out dist --interpreter python3.12
1017

test-services/services/__init__.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,17 @@
1111
from typing import Dict, Union
1212
from restate import Service, VirtualObject, Workflow
1313

14-
from .counter import counter_object
15-
from .proxy import proxy
16-
from .awakable_holder import awakeable_holder
17-
from. block_and_wait_workflow import workflow
18-
from .cancel_test import runner, blocking_service
19-
from .failing import failing
20-
from .kill_test import kill_runner, kill_singleton
21-
from .list_object import list_object
22-
from .map_object import map_object
23-
from .non_determinism import non_deterministic
24-
from .test_utils import test_utils
14+
from .counter import counter_object as s1
15+
from .proxy import proxy as s2
16+
from .awakeable_holder import awakeable_holder as s3
17+
from. block_and_wait_workflow import workflow as s4
18+
from .cancel_test import runner, blocking_service as s5
19+
from .failing import failing as s6
20+
from .kill_test import kill_runner, kill_singleton as s7
21+
from .list_object import list_object as s8
22+
from .map_object import map_object as s9
23+
from .non_determinism import non_deterministic as s10
24+
from .test_utils import test_utils as s11
2525

2626
def list_services(bindings):
2727
"""List all services in this module"""

test-services/services/cancel_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from restate import VirtualObject, ObjectContext
1919
from restate.exceptions import TerminalError
2020

21-
from . import awakable_holder
21+
from . import awakeable_holder
2222

2323
BlockingOperation = Literal["CALL", "SLEEP", "AWAKEABLE"]
2424

@@ -47,7 +47,7 @@ async def verify_test(ctx: ObjectContext) -> bool:
4747
@blocking_service.handler()
4848
async def block(ctx: ObjectContext, op: BlockingOperation):
4949
name, awakeable = ctx.awakeable()
50-
await ctx.object_call(awakable_holder.hold, key="cancel", arg=name)
50+
await ctx.object_call(awakeable_holder.hold, key="cancel", arg=name)
5151
await awakeable
5252

5353
if op == "CALL":

test-services/services/failing.py

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,28 +39,44 @@ async def failing_call_with_eventual_success(ctx: ObjectContext) -> int:
3939
return 4
4040
raise ValueError(f"Failed at attempt: {failures}")
4141

42+
@failing.handler(name="terminallyFailingSideEffect")
43+
async def terminally_failing_side_effect(ctx: ObjectContext):
44+
45+
def side_effect():
46+
raise TerminalError(message="failed side effect")
47+
48+
await ctx.run("sideEffect", side_effect)
49+
raise ValueError("Should not reach here")
4250

43-
side_effect_failures = 0
4451

45-
@failing.handler(name="failingSideEffectWithEventualSuccess")
46-
async def failing_side_effect_with_eventual_success(ctx: ObjectContext) -> int:
52+
eventual_success_side_effects = 0
53+
54+
@failing.handler(name="sideEffectSucceedsAfterGivenAttempts")
55+
async def side_effect_succeeds_after_given_attempts(ctx: ObjectContext, minimum_attempts: int) -> int:
4756

4857
def side_effect():
49-
global side_effect_failures
50-
side_effect_failures += 1
51-
if side_effect_failures >= 4:
52-
side_effect_failures = 0
53-
return 4
54-
raise ValueError(f"Failed at attempt: {side_effect_failures}")
58+
global eventual_success_side_effects
59+
eventual_success_side_effects += 1
60+
if eventual_success_side_effects >= minimum_attempts:
61+
return eventual_success_side_effects
62+
raise ValueError(f"Failed at attempt: {eventual_success_side_effects}")
5563

56-
return await ctx.run("sideEffect", side_effect) # type: ignore
64+
return await ctx.run("sideEffect", side_effect, max_attempts=minimum_attempts + 1) # type: ignore
5765

66+
eventual_failure_side_effects = 0
5867

59-
@failing.handler(name="terminallyFailingSideEffect")
60-
async def terminally_failing_side_effect(ctx: ObjectContext):
68+
@failing.handler(name="sideEffectFailsAfterGivenAttempts")
69+
async def side_effect_fails_after_given_attempts(ctx: ObjectContext, retry_policy_max_retry_count: int) -> int:
6170

6271
def side_effect():
63-
raise TerminalError(message="failed side effect")
72+
global eventual_failure_side_effects
73+
eventual_failure_side_effects += 1
74+
raise ValueError(f"Failed at attempt: {eventual_failure_side_effects}")
75+
76+
try:
77+
await ctx.run("sideEffect", side_effect, max_attempts=retry_policy_max_retry_count)
78+
raise ValueError("Side effect did not fail.")
79+
except TerminalError as t:
80+
global eventual_failure_side_effects
81+
return eventual_failure_side_effects
6482

65-
await ctx.run("sideEffect", side_effect)
66-
raise ValueError("Should not reach here")

test-services/services/kill_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from restate import Service, Context, VirtualObject, ObjectContext
1616

17-
from . import awakable_holder
17+
from . import awakeable_holder
1818

1919
kill_runner = Service("KillTestRunner")
2020

@@ -27,7 +27,7 @@ async def start_call_tree(ctx: Context):
2727
@kill_singleton.handler(name="recursiveCall")
2828
async def recursive_call(ctx: ObjectContext):
2929
name, promise = ctx.awakeable()
30-
ctx.object_send(awakable_holder.hold, key="kill", arg=name)
30+
ctx.object_send(awakeable_holder.hold, key="kill", arg=name)
3131
await promise
3232

3333
await ctx.object_call(recursive_call, key="", arg=None)

test-services/services/proxy.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# pylint: disable=C0116
1313
# pylint: disable=W0613
1414

15+
from datetime import timedelta
1516
from restate import Service, Context
1617
from typing import TypedDict, Optional, Iterable
1718

@@ -23,6 +24,7 @@ class ProxyRequest(TypedDict):
2324
virtualObjectKey: Optional[str]
2425
handlerName: str
2526
message: Iterable[int]
27+
delayMillis: Optional[int]
2628

2729

2830
@proxy.handler()
@@ -31,16 +33,21 @@ async def call(ctx: Context, req: ProxyRequest) -> Iterable[int]:
3133
req['serviceName'],
3234
req['handlerName'],
3335
bytes(req['message']),
34-
req['virtualObjectKey']))
36+
req.get('virtualObjectKey')))
3537

3638

3739
@proxy.handler(name="oneWayCall")
3840
async def one_way_call(ctx: Context, req: ProxyRequest):
41+
send_delay = None
42+
if req.get('delayMillis'):
43+
send_delay = timedelta(milliseconds=req['delayMillis'])
3944
ctx.generic_send(
4045
req['serviceName'],
4146
req['handlerName'],
4247
bytes(req['message']),
43-
req['virtualObjectKey'])
48+
req.get('virtualObjectKey'),
49+
send_delay
50+
)
4451

4552

4653
class ManyCallRequest(TypedDict):
@@ -54,17 +61,22 @@ async def many_calls(ctx: Context, requests: Iterable[ManyCallRequest]):
5461

5562
for req in requests:
5663
if req['oneWayCall']:
64+
send_delay = None
65+
if req['proxyRequest'].get('delayMillis'):
66+
send_delay = timedelta(milliseconds=req['proxyRequest']['delayMillis'])
5767
ctx.generic_send(
5868
req['proxyRequest']['serviceName'],
5969
req['proxyRequest']['handlerName'],
6070
bytes(req['proxyRequest']['message']),
61-
req['proxyRequest']['virtualObjectKey'])
71+
req['proxyRequest'].get('virtualObjectKey'),
72+
send_delay
73+
)
6274
else:
6375
awaitable = ctx.generic_call(
6476
req['proxyRequest']['serviceName'],
6577
req['proxyRequest']['handlerName'],
6678
bytes(req['proxyRequest']['message']),
67-
req['proxyRequest']['virtualObjectKey'])
79+
req['proxyRequest'].get('virtualObjectKey'))
6880
if req['awaitAtTheEnd']:
6981
to_await.append(awaitable)
7082

test-services/services/test_utils.py

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@
1212
# pylint: disable=C0116
1313
# pylint: disable=W0613
1414

15+
import os
1516
from datetime import timedelta
16-
from typing import Dict, Any
17-
import typing
17+
from typing import (Dict, Iterable, List, Union, TypedDict, Literal, Any)
1818
from restate import Service, Context
1919

20-
from . import awakable_holder
20+
from . import list_object
21+
from . import awakeable_holder
2122

2223
test_utils = Service("TestUtilsService")
2324

@@ -37,7 +38,7 @@ async def echo_headers(context: Context) -> Dict[str, str]:
3738
async def create_awakeable_and_await_it(context: Context, req: Dict[str, Any]) -> Dict[str, Any]:
3839
name, awakeable = context.awakeable()
3940

40-
await context.object_call(awakable_holder.hold, key=req["awakeableKey"], arg=name)
41+
await context.object_call(awakeable_holder.hold, key=req["awakeableKey"], arg=name)
4142

4243
if "awaitTimeout" not in req:
4344
return {"type": "result", "value": await awakeable}
@@ -46,7 +47,7 @@ async def create_awakeable_and_await_it(context: Context, req: Dict[str, Any]) -
4647
raise NotImplementedError()
4748

4849
@test_utils.handler(name="sleepConcurrently")
49-
async def sleep_concurrently(context: Context, millis_duration: typing.List[int]) -> None:
50+
async def sleep_concurrently(context: Context, millis_duration: List[int]) -> None:
5051
timers = [context.sleep(timedelta(milliseconds=duration)) for duration in millis_duration]
5152

5253
for timer in timers:
@@ -65,4 +66,35 @@ def effect():
6566
await context.run("count", effect)
6667

6768
return invoked_side_effects
68-
69+
70+
@test_utils.handler(name="getEnvVariable")
71+
async def get_env_variable(context: Context, env_name: str) -> str:
72+
return os.environ.get(env_name, default="")
73+
74+
class CreateAwakeableAndAwaitIt(TypedDict):
75+
type: Literal["createAwakeableAndAwaitIt"]
76+
awakeableKey: str
77+
78+
class GetEnvVariable(TypedDict):
79+
type: Literal["getEnvVariable"]
80+
envName: str
81+
82+
Command = Union[
83+
CreateAwakeableAndAwaitIt,
84+
GetEnvVariable
85+
]
86+
87+
class InterpretRequest(TypedDict):
88+
listName: str
89+
commands: Iterable[Command]
90+
91+
@test_utils.handler(name="interpretCommands")
92+
async def interpret_commands(context: Context, req: InterpretRequest):
93+
for cmd in req['commands']:
94+
if cmd['type'] == "createAwakeableAndAwaitIt":
95+
name, awakeable = context.awakeable()
96+
context.object_send(awakeable_holder.hold, key=cmd["awakeableKey"], arg=name)
97+
result = await awakeable
98+
context.object_send(list_object.append, key=req['listName'], arg=result)
99+
elif cmd['type'] == "getEnvVariable":
100+
context.object_send(list_object.append, key=req['listName'], arg=os.environ.get(cmd['envName'], default=""))

0 commit comments

Comments
 (0)