Skip to content

Commit 35969b2

Browse files
committed
reset/reapply_updates
1 parent 092ac9c commit 35969b2

File tree

1 file changed

+164
-0
lines changed

1 file changed

+164
-0
lines changed

reset/reapply_updates.py

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
import asyncio
2+
import socket
3+
from typing import Iterator, List
4+
from uuid import uuid4
5+
6+
import temporalio.api.common.v1
7+
import temporalio.api.enums.v1
8+
import temporalio.api.history.v1
9+
import temporalio.api.workflowservice.v1
10+
from temporalio import workflow
11+
from temporalio.client import Client, WorkflowHandle
12+
from temporalio.worker import Worker
13+
14+
RunId = str
15+
16+
WORKFLOW_ID = uuid4().hex
17+
TASK_QUEUE = __file__
18+
19+
20+
@workflow.defn
21+
class WorkflowWithUpdateHandler:
22+
def __init__(self) -> None:
23+
self.update_args = []
24+
self.signal_args = []
25+
26+
@workflow.update
27+
async def my_update(self, arg: int):
28+
self.update_args.append(arg)
29+
30+
@workflow.signal
31+
async def my_signal(self, arg: int):
32+
self.signal_args.append(arg)
33+
34+
@workflow.run
35+
async def run(self) -> dict:
36+
await asyncio.sleep(0xFFFF)
37+
await workflow.wait_condition(
38+
lambda: False and bool(self.signal_args and self.update_args)
39+
)
40+
return {"update_args": self.update_args, "signal_args": self.signal_args}
41+
42+
43+
async def app(client: Client):
44+
handle = await client.start_workflow(
45+
WorkflowWithUpdateHandler.run, id=WORKFLOW_ID, task_queue=TASK_QUEUE
46+
)
47+
48+
print(
49+
f"started workflow http://{server()}/namespaces/default/workflows/{WORKFLOW_ID}"
50+
)
51+
52+
if input("send signals?") in ["y", ""]:
53+
for i in range(2):
54+
await handle.signal(WorkflowWithUpdateHandler.my_signal, arg=i)
55+
print(f"sent signal")
56+
57+
if input("execute updates?") in ["y", ""]:
58+
for i in range(2):
59+
await handle.execute_update(WorkflowWithUpdateHandler.my_update, arg=i)
60+
print(f"executed update")
61+
62+
if input("reset?") in ["y", ""]:
63+
history = [e async for e in handle.fetch_history_events()]
64+
reset_to = next_event(
65+
history,
66+
temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED,
67+
)
68+
69+
print(f"about to reset to event {reset_to.event_id}")
70+
run_id = get_first_execution_run_id(history)
71+
new_run_id = await reset_workflow(run_id, reset_to, client)
72+
print(
73+
f"did reset: http://localhost:8080/namespaces/default/workflows/{WORKFLOW_ID}/{new_run_id}"
74+
)
75+
76+
new_handle = client.get_workflow_handle(WORKFLOW_ID, run_id=new_run_id)
77+
78+
history = [e async for e in new_handle.fetch_history_events()]
79+
80+
print("new history")
81+
for e in history:
82+
print(f"{e.event_id} {e.event_type}")
83+
print(e)
84+
85+
await asyncio.sleep(0xFFFF)
86+
wf_result = handle.result()
87+
print(f"wf result: {wf_result}")
88+
89+
90+
async def reset_workflow(
91+
run_id: str,
92+
event: temporalio.api.history.v1.HistoryEvent,
93+
client: Client,
94+
) -> RunId:
95+
resp = await client.workflow_service.reset_workflow_execution(
96+
temporalio.api.workflowservice.v1.ResetWorkflowExecutionRequest(
97+
namespace="default",
98+
workflow_execution=temporalio.api.common.v1.WorkflowExecution(
99+
workflow_id=WORKFLOW_ID,
100+
run_id=run_id,
101+
),
102+
reason="Reset to test update reapply",
103+
request_id="1",
104+
reset_reapply_type=temporalio.api.enums.v1.ResetReapplyType.RESET_REAPPLY_TYPE_UNSPECIFIED, # TODO
105+
workflow_task_finish_event_id=event.event_id,
106+
)
107+
)
108+
assert resp.run_id
109+
return resp.run_id
110+
111+
112+
def next_event(
113+
history: List[temporalio.api.history.v1.HistoryEvent],
114+
event_type: temporalio.api.enums.v1.EventType.ValueType,
115+
) -> temporalio.api.history.v1.HistoryEvent:
116+
return next(e for e in history if e.event_type == event_type)
117+
118+
119+
def get_first_execution_run_id(
120+
history: List[temporalio.api.history.v1.HistoryEvent],
121+
) -> str:
122+
# TODO: correct way to obtain run_id
123+
wf_started_event = next_event(
124+
history, temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
125+
)
126+
run_id = (
127+
wf_started_event.workflow_execution_started_event_attributes.first_execution_run_id
128+
)
129+
assert run_id
130+
return run_id
131+
132+
133+
async def main():
134+
client = await Client.connect("localhost:7233")
135+
async with Worker(
136+
client, task_queue=TASK_QUEUE, workflows=[WorkflowWithUpdateHandler]
137+
):
138+
await app(client)
139+
140+
141+
def only(it: Iterator):
142+
t = next(it)
143+
assert next(it, it) == it
144+
return t
145+
146+
147+
def is_listening(addr: str) -> bool:
148+
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
149+
h, p = addr.split(":")
150+
try:
151+
s.connect((h, int(p)))
152+
return True
153+
except socket.error:
154+
return False
155+
finally:
156+
s.close()
157+
158+
159+
def server() -> str:
160+
return only(filter(is_listening, ["localhost:8080", "localhost:8233"]))
161+
162+
163+
if __name__ == "__main__":
164+
asyncio.run(main())

0 commit comments

Comments
 (0)