Skip to content

Commit 1397704

Browse files
committed
Versioning behavior passed through properly / definition changes
1 parent 3c70c30 commit 1397704

File tree

17 files changed

+242
-82
lines changed

17 files changed

+242
-82
lines changed

packages/common/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ export * from './logger';
2222
export * from './retry-policy';
2323
export type { Timestamp, Duration, StringValue } from './time';
2424
export * from './worker-deployments';
25+
export * from './workflow-definition-options';
2526
export * from './workflow-handle';
2627
export * from './workflow-options';
2728
export * from './versioning-intent';

packages/common/src/interfaces.ts

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { temporal } from '@temporalio/proto';
2+
import { WorkflowFunctionWithOptions } from './workflow-definition-options';
23

34
export type Payload = temporal.api.common.v1.IPayload;
45

packages/common/src/worker-deployments.ts

+6-2
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@
44
* @experimental Worker deployments are experimental
55
*/
66
export interface WorkerDeploymentVersion {
7-
buildId: string;
8-
deploymentName: string;
7+
readonly buildId: string;
8+
readonly deploymentName: string;
9+
}
10+
11+
export function toCanonicalString(version: WorkerDeploymentVersion): string {
12+
return `${version.deploymentName}.${version.buildId}`;
913
}
1014

1115
export type VersioningBehavior = 'pinned' | 'auto-upgrade';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { VersioningBehavior } from './worker-deployments';
2+
3+
/**
4+
* Options that can be used when defining a workflow via {@link defineWorkflowWithOptions}.
5+
*/
6+
export interface WorkflowDefinitionOptions {
7+
versioningBehavior?: VersioningBehavior;
8+
}
9+
10+
type AsyncFunction<Args extends any[], ReturnType> = (...args: Args) => Promise<ReturnType>;
11+
12+
/**
13+
* A workflow function that has been defined with options from {@link WorkflowDefinitionOptions}.
14+
*/
15+
export interface WorkflowFunctionWithOptions<Args extends any[], ReturnType> extends AsyncFunction<Args, ReturnType> {
16+
__temporal_is_workflow_function_with_options: true;
17+
options: WorkflowDefinitionOptions;
18+
}

packages/common/src/workflow-options.ts

+8-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { RetryPolicy } from './retry-policy';
44
import { Duration } from './time';
55
import { makeProtoEnumConverters } from './internal-workflow';
66
import { SearchAttributePair, SearchAttributes, TypedSearchAttributes } from './search-attributes';
7+
import { WorkflowFunctionWithOptions } from './workflow-definition-options';
78

89
/**
910
* Defines what happens when trying to start a Workflow with the same ID as a *Closed* Workflow.
@@ -237,7 +238,9 @@ export interface WorkflowDurationOptions {
237238

238239
export type CommonWorkflowOptions = BaseWorkflowOptions & WorkflowDurationOptions;
239240

240-
export function extractWorkflowType<T extends Workflow>(workflowTypeOrFunc: string | T): string {
241+
export function extractWorkflowType<T extends Workflow>(
242+
workflowTypeOrFunc: string | T | WorkflowFunctionWithOptions<any[], any>
243+
): string {
241244
if (typeof workflowTypeOrFunc === 'string') return workflowTypeOrFunc as string;
242245
if (typeof workflowTypeOrFunc === 'function') {
243246
if (workflowTypeOrFunc?.name) return workflowTypeOrFunc.name;
@@ -247,3 +250,7 @@ export function extractWorkflowType<T extends Workflow>(workflowTypeOrFunc: stri
247250
`Invalid workflow type: expected either a string or a function, got '${typeof workflowTypeOrFunc}'`
248251
);
249252
}
253+
254+
export function isWorkflowFunctionWithOptions(obj: any): obj is WorkflowFunctionWithOptions<any[], any> {
255+
return obj.__temporal_is_workflow_function_with_options === true;
256+
}

packages/core-bridge/src/conversions.rs

-2
Original file line numberDiff line numberDiff line change
@@ -560,8 +560,6 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
560560
}
561561
};
562562

563-
dbg!("!!!! versioning strategy is: {:?}", &versioning_strategy);
564-
565563
match WorkerConfigBuilder::default()
566564
.versioning_strategy(versioning_strategy)
567565
.client_identity_override(Some(js_value_getter!(cx, self, "identity", JsString)))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import { CancelledFailure, setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow';
2+
import { unblockSignal, versionQuery } from '../workflows';
3+
4+
defineWorkflowWithOptions({ versioningBehavior: 'auto-upgrade' }, deploymentVersioning);
5+
export async function deploymentVersioning(): Promise<string> {
6+
let doFinish = false;
7+
setHandler(unblockSignal, () => void (doFinish = true));
8+
setHandler(versionQuery, () => 'v1');
9+
await condition(() => doFinish);
10+
return 'version-v1';
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import { CancelledFailure, setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow';
2+
import { unblockSignal, versionQuery } from '../workflows';
3+
4+
defineWorkflowWithOptions({ versioningBehavior: 'pinned' }, deploymentVersioning);
5+
export async function deploymentVersioning(): Promise<string> {
6+
let doFinish = false;
7+
setHandler(unblockSignal, () => void (doFinish = true));
8+
setHandler(versionQuery, () => 'v2');
9+
await condition(() => doFinish);
10+
return 'version-v2';
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import { CancelledFailure, setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow';
2+
import { unblockSignal, versionQuery } from '../workflows';
3+
4+
defineWorkflowWithOptions({ versioningBehavior: 'auto-upgrade' }, deploymentVersioning);
5+
export async function deploymentVersioning(): Promise<string> {
6+
let doFinish = false;
7+
setHandler(unblockSignal, () => void (doFinish = true));
8+
setHandler(versionQuery, () => 'v3');
9+
await condition(() => doFinish);
10+
return 'version-v3';
11+
}

packages/test/src/test-worker-deployment-versioning.ts

+147-34
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ import asyncRetry from 'async-retry';
99
import { Client } from '@temporalio/client';
1010
import { Worker } from './helpers';
1111
import * as activities from './activities';
12-
import { WorkerDeploymentVersion } from '@temporalio/common';
12+
import { toCanonicalString, WorkerDeploymentVersion } from '@temporalio/common';
1313
import { makeTestFunction } from './helpers-integration';
14+
import { unblockSignal, versionQuery } from './workflows/';
1415

1516
const test = makeTestFunction({ workflowsPath: __filename });
1617

@@ -33,7 +34,7 @@ test('Worker deployment based versioning', async (t) => {
3334
};
3435

3536
const worker1 = await Worker.create({
36-
workflowsPath: require.resolve('./workflows'),
37+
workflowsPath: require.resolve('./deployment-versioning-v1'),
3738
activities,
3839
taskQueue,
3940
workerDeploymentOptions: {
@@ -47,7 +48,7 @@ test('Worker deployment based versioning', async (t) => {
4748
});
4849

4950
const worker2 = await Worker.create({
50-
workflowsPath: require.resolve('./workflows'),
51+
workflowsPath: require.resolve('./deployment-versioning-v2'),
5152
activities,
5253
taskQueue,
5354
workerDeploymentOptions: {
@@ -61,7 +62,7 @@ test('Worker deployment based versioning', async (t) => {
6162
});
6263

6364
const worker3 = await Worker.create({
64-
workflowsPath: require.resolve('./workflows'),
65+
workflowsPath: require.resolve('./deployment-versioning-v3'),
6566
activities,
6667
taskQueue,
6768
workerDeploymentOptions: {
@@ -79,41 +80,41 @@ test('Worker deployment based versioning', async (t) => {
7980
await setCurrentDeploymentVersion(client, describeResp1.conflictToken, w1DeploymentVersion);
8081

8182
// Start workflow 1 which will use the 1.0 worker on auto-upgrade
82-
const wf1 = await client.workflow.start('autoUpgradeWorkflow', {
83+
const wf1 = await client.workflow.start('deploymentVersioning', {
8384
taskQueue,
84-
workflowId: 'basic-versioning-v1-' + randomUUID(),
85+
workflowId: 'deployment-versioning-v1-' + randomUUID(),
8586
});
86-
const state1 = await wf1.query('state');
87+
const state1 = await wf1.query(versionQuery);
8788
assert.equal(state1, 'v1');
8889

8990
// Wait for worker 2 to be visible and set as current version
9091
const describeResp2 = await waitUntilWorkerDeploymentVisible(client, w2DeploymentVersion);
9192
await setCurrentDeploymentVersion(client, describeResp2.conflictToken, w2DeploymentVersion);
9293

9394
// Start workflow 2 which will use the 2.0 worker pinned
94-
const wf2 = await client.workflow.start('pinnedWorkflow', {
95+
const wf2 = await client.workflow.start('deploymentVersioning', {
9596
taskQueue,
96-
workflowId: 'basic-versioning-v2-' + randomUUID(),
97+
workflowId: 'deployment-versioning-v2-' + randomUUID(),
9798
});
98-
const state2 = await wf2.query('state');
99+
const state2 = await wf2.query(versionQuery);
99100
assert.equal(state2, 'v2');
100101

101102
// Wait for worker 3 to be visible and set as current version
102103
const describeResp3 = await waitUntilWorkerDeploymentVisible(client, w3DeploymentVersion);
103104
await setCurrentDeploymentVersion(client, describeResp3.conflictToken, w3DeploymentVersion);
104105

105106
// Start workflow 3 which will use the 3.0 worker on auto-upgrade
106-
const wf3 = await client.workflow.start('autoUpgradeWorkflow', {
107+
const wf3 = await client.workflow.start('deploymentVersioning', {
107108
taskQueue,
108-
workflowId: 'basic-versioning-v3-' + randomUUID(),
109+
workflowId: 'deployment-versioning-v3-' + randomUUID(),
109110
});
110-
const state3 = await wf3.query('state');
111+
const state3 = await wf3.query(versionQuery);
111112
assert.equal(state3, 'v3');
112113

113114
// Signal all workflows to finish
114-
await wf1.signal('doFinish');
115-
await wf2.signal('doFinish');
116-
await wf3.signal('doFinish');
115+
await wf1.signal(unblockSignal);
116+
await wf2.signal(unblockSignal);
117+
await wf3.signal(unblockSignal);
117118

118119
const res1 = await wf1.result();
119120
const res2 = await wf2.result();
@@ -132,27 +133,139 @@ test('Worker deployment based versioning', async (t) => {
132133
t.pass();
133134
});
134135

136+
test('Worker deployment based versioning with ramping', async (t) => {
137+
const taskQueue = 'worker-deployment-based-ramping-' + randomUUID();
138+
const deploymentName = 'deployment-ramping-' + randomUUID();
139+
const client = t.context.env.client;
140+
141+
const v1 = {
142+
buildId: '1.0',
143+
deploymentName: deploymentName,
144+
};
145+
const v2 = {
146+
buildId: '2.0',
147+
deploymentName: deploymentName,
148+
};
149+
150+
const worker1 = await Worker.create({
151+
workflowsPath: require.resolve('./deployment-versioning-v1'),
152+
activities,
153+
taskQueue,
154+
workerDeploymentOptions: {
155+
useWorkerVersioning: true,
156+
version: v1,
157+
},
158+
});
159+
const worker1Promise = worker1.run();
160+
worker1Promise.catch((err) => {
161+
t.fail('Worker 1.0 run error: ' + JSON.stringify(err));
162+
});
163+
164+
const worker2 = await Worker.create({
165+
workflowsPath: require.resolve('./deployment-versioning-v2'),
166+
activities,
167+
taskQueue,
168+
workerDeploymentOptions: {
169+
useWorkerVersioning: true,
170+
version: v2,
171+
},
172+
});
173+
const worker2Promise = worker2.run();
174+
worker2Promise.catch((err) => {
175+
t.fail('Worker 2.0 run error: ' + JSON.stringify(err));
176+
});
177+
178+
// Wait for worker deployments to be visible
179+
await waitUntilWorkerDeploymentVisible(client, v1);
180+
const describeResp = await waitUntilWorkerDeploymentVisible(client, v2);
181+
182+
// Set current version to v1 and ramp v2 to 100%
183+
let conflictToken = (await setCurrentDeploymentVersion(client, describeResp.conflictToken, v1)).conflictToken;
184+
conflictToken = (await setRampingVersion(client, conflictToken, v2, 100)).conflictToken;
185+
186+
// Run workflows and verify they run on v2
187+
for (let i = 0; i < 3; i++) {
188+
const wf = await client.workflow.start('deploymentVersioning', {
189+
taskQueue,
190+
workflowId: `versioning-ramp-100-${i}-${randomUUID()}`,
191+
});
192+
await wf.signal(unblockSignal);
193+
const res = await wf.result();
194+
assert.equal(res, 'version-v2');
195+
}
196+
197+
// Set ramp to 0, expecting workflows to run on v1
198+
conflictToken = (await setRampingVersion(client, conflictToken, v2, 0)).conflictToken;
199+
for (let i = 0; i < 3; i++) {
200+
const wf = await client.workflow.start('deploymentVersioning', {
201+
taskQueue,
202+
workflowId: `versioning-ramp-0-${i}-${randomUUID()}`,
203+
});
204+
await wf.signal(unblockSignal);
205+
const res = await wf.result();
206+
assert.equal(res, 'version-v1');
207+
}
208+
209+
// Set ramp to 50 and eventually verify workflows run on both versions
210+
await setRampingVersion(client, conflictToken, v2, 50);
211+
const seenResults = new Set<string>();
212+
213+
const runAndRecord = async () => {
214+
const wf = await client.workflow.start('deploymentVersioning', {
215+
taskQueue,
216+
workflowId: `versioning-ramp-50-${randomUUID()}`,
217+
});
218+
await wf.signal(unblockSignal);
219+
return await wf.result();
220+
};
221+
222+
await asyncRetry(
223+
async () => {
224+
const res = await runAndRecord();
225+
seenResults.add(res);
226+
if (!seenResults.has('version-v1') || !seenResults.has('version-v2')) {
227+
throw new Error('Not all versions seen yet');
228+
}
229+
},
230+
{ maxTimeout: 1000, retries: 20 }
231+
);
232+
233+
worker1.shutdown();
234+
worker2.shutdown();
235+
await worker1Promise;
236+
await worker2Promise;
237+
t.pass();
238+
});
239+
240+
async function setRampingVersion(
241+
client: Client,
242+
conflictToken: Uint8Array,
243+
version: WorkerDeploymentVersion,
244+
percentage: number
245+
) {
246+
return await client.workflowService.setWorkerDeploymentRampingVersion({
247+
namespace: client.options.namespace,
248+
deploymentName: version.deploymentName,
249+
version: toCanonicalString(version),
250+
conflictToken,
251+
percentage,
252+
});
253+
}
254+
135255
async function waitUntilWorkerDeploymentVisible(client: Client, version: WorkerDeploymentVersion) {
136256
return await asyncRetry(
137257
async () => {
138-
try {
139-
const resp = await client.workflowService.describeWorkerDeployment({
140-
namespace: client.options.namespace,
141-
deploymentName: version.deploymentName,
142-
});
143-
144-
const isVersionVisible = resp.workerDeploymentInfo!.versionSummaries!.some(
145-
(vs) => vs.version === version.buildId
146-
);
147-
148-
if (!isVersionVisible) {
149-
throw new Error('Version not visible yet');
150-
}
151-
152-
return resp;
153-
} catch (error) {
154-
throw error;
258+
const resp = await client.workflowService.describeWorkerDeployment({
259+
namespace: client.options.namespace,
260+
deploymentName: version.deploymentName,
261+
});
262+
const isVersionVisible = resp.workerDeploymentInfo!.versionSummaries!.some(
263+
(vs) => vs.version === toCanonicalString(version)
264+
);
265+
if (!isVersionVisible) {
266+
throw new Error('Version not visible yet');
155267
}
268+
return resp;
156269
},
157270
{ maxTimeout: 1000, retries: 10 }
158271
);
@@ -166,7 +279,7 @@ async function setCurrentDeploymentVersion(
166279
return await client.workflowService.setWorkerDeploymentCurrentVersion({
167280
namespace: client.options.namespace,
168281
deploymentName: version.deploymentName,
169-
version: version.buildId,
282+
version: toCanonicalString(version),
170283
conflictToken,
171284
});
172285
}
+2-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
/* eslint-disable no-duplicate-imports */
2-
import { defineSignal } from '@temporalio/workflow';
2+
import { defineQuery, defineSignal } from '@temporalio/workflow';
33

44
export const activityStartedSignal = defineSignal('activityStarted');
55
export const failSignal = defineSignal('fail');
66
export const failWithMessageSignal = defineSignal<[string]>('fail');
77
export const argsTestSignal = defineSignal<[number, string]>('argsTest');
88
export const unblockSignal = defineSignal('unblock');
9+
export const versionQuery = defineQuery('version');

0 commit comments

Comments
 (0)