Skip to content

Commit 6b380c5

Browse files
committed
Dynamic workflow support
1 parent 1397704 commit 6b380c5

File tree

5 files changed

+129
-4
lines changed

5 files changed

+129
-4
lines changed

packages/common/src/workflow-options.ts

+1
Original file line numberDiff line numberDiff line change
@@ -252,5 +252,6 @@ export function extractWorkflowType<T extends Workflow>(
252252
}
253253

254254
export function isWorkflowFunctionWithOptions(obj: any): obj is WorkflowFunctionWithOptions<any[], any> {
255+
if (obj === undefined || obj === null) return false;
255256
return obj.__temporal_is_workflow_function_with_options === true;
256257
}

packages/test/src/deployment-versioning-v1/index.ts

+6
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,9 @@ export async function deploymentVersioning(): Promise<string> {
99
await condition(() => doFinish);
1010
return 'version-v1';
1111
}
12+
13+
// Dynamic/default workflow handler
14+
export default defineWorkflowWithOptions({ versioningBehavior: 'pinned' }, _default);
15+
async function _default(): Promise<string> {
16+
return 'dynamic';
17+
}

packages/test/src/test-worker-deployment-versioning.ts

+116
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import * as activities from './activities';
1212
import { toCanonicalString, WorkerDeploymentVersion } from '@temporalio/common';
1313
import { makeTestFunction } from './helpers-integration';
1414
import { unblockSignal, versionQuery } from './workflows/';
15+
import { temporal } from '@temporalio/proto';
1516

1617
const test = makeTestFunction({ workflowsPath: __filename });
1718

@@ -40,6 +41,7 @@ test('Worker deployment based versioning', async (t) => {
4041
workerDeploymentOptions: {
4142
useWorkerVersioning: true,
4243
version: w1DeploymentVersion,
44+
defaultVersioningBehavior: 'pinned',
4345
},
4446
});
4547
const worker1Promise = worker1.run();
@@ -54,6 +56,7 @@ test('Worker deployment based versioning', async (t) => {
5456
workerDeploymentOptions: {
5557
useWorkerVersioning: true,
5658
version: w2DeploymentVersion,
59+
defaultVersioningBehavior: 'pinned',
5760
},
5861
});
5962
const worker2Promise = worker2.run();
@@ -68,6 +71,7 @@ test('Worker deployment based versioning', async (t) => {
6871
workerDeploymentOptions: {
6972
useWorkerVersioning: true,
7073
version: w3DeploymentVersion,
74+
defaultVersioningBehavior: 'pinned',
7175
},
7276
});
7377
const worker3Promise = worker3.run();
@@ -154,6 +158,7 @@ test('Worker deployment based versioning with ramping', async (t) => {
154158
workerDeploymentOptions: {
155159
useWorkerVersioning: true,
156160
version: v1,
161+
defaultVersioningBehavior: 'pinned',
157162
},
158163
});
159164
const worker1Promise = worker1.run();
@@ -168,6 +173,7 @@ test('Worker deployment based versioning with ramping', async (t) => {
168173
workerDeploymentOptions: {
169174
useWorkerVersioning: true,
170175
version: v2,
176+
defaultVersioningBehavior: 'pinned',
171177
},
172178
});
173179
const worker2Promise = worker2.run();
@@ -237,6 +243,116 @@ test('Worker deployment based versioning with ramping', async (t) => {
237243
t.pass();
238244
});
239245

246+
test('Worker deployment with dynamic workflow on run', async (t) => {
247+
if (t.context.env.supportsTimeSkipping) {
248+
t.pass("Test Server doesn't support worker deployments");
249+
return;
250+
}
251+
252+
const taskQueue = 'worker-deployment-dynamic-' + randomUUID();
253+
const deploymentName = 'deployment-dynamic-' + randomUUID();
254+
const client = t.context.env.client;
255+
256+
const version = {
257+
buildId: '1.0',
258+
deploymentName: deploymentName,
259+
};
260+
261+
const worker = await Worker.create({
262+
workflowsPath: require.resolve('./deployment-versioning-v1'),
263+
activities,
264+
taskQueue,
265+
workerDeploymentOptions: {
266+
useWorkerVersioning: true,
267+
version: version,
268+
defaultVersioningBehavior: 'auto-upgrade',
269+
},
270+
});
271+
272+
const workerPromise = worker.run();
273+
workerPromise.catch((err) => {
274+
t.fail('Worker run error: ' + JSON.stringify(err));
275+
});
276+
277+
const describeResp = await waitUntilWorkerDeploymentVisible(client, version);
278+
await setCurrentDeploymentVersion(client, describeResp.conflictToken, version);
279+
280+
const wf = await client.workflow.start('cooldynamicworkflow', {
281+
taskQueue,
282+
workflowId: 'dynamic-workflow-versioning-' + randomUUID(),
283+
});
284+
285+
const result = await wf.result();
286+
assert.equal(result, 'dynamic');
287+
288+
// Check history for versioning behavior
289+
const history = await wf.fetchHistory();
290+
291+
const hasPinnedVersioningBehavior = history.events!.some(
292+
(event) =>
293+
event.workflowTaskCompletedEventAttributes &&
294+
event.workflowTaskCompletedEventAttributes.versioningBehavior ===
295+
temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED
296+
);
297+
assert.ok(hasPinnedVersioningBehavior, 'Expected workflow to use pinned versioning behavior');
298+
299+
worker.shutdown();
300+
await workerPromise;
301+
t.pass();
302+
});
303+
304+
test('Workflows can use default versioning behavior', async (t) => {
305+
const taskQueue = 'task-queue-default-versioning-' + randomUUID();
306+
const deploymentName = 'deployment-default-versioning-' + randomUUID();
307+
const client = t.context.env.client;
308+
309+
const workerV1 = {
310+
buildId: '1.0',
311+
deploymentName: deploymentName,
312+
};
313+
314+
const worker = await Worker.create({
315+
workflowsPath: require.resolve('./deployment-versioning-no-annotations'),
316+
activities,
317+
taskQueue,
318+
workerDeploymentOptions: {
319+
useWorkerVersioning: true,
320+
version: workerV1,
321+
defaultVersioningBehavior: 'pinned',
322+
},
323+
});
324+
325+
const workerPromise = worker.run();
326+
workerPromise.catch((err) => {
327+
t.fail('Worker run error: ' + JSON.stringify(err));
328+
});
329+
330+
const describeResp = await waitUntilWorkerDeploymentVisible(client, workerV1);
331+
await setCurrentDeploymentVersion(client, describeResp.conflictToken, workerV1);
332+
333+
const wf = await client.workflow.start('noVersioningAnnotationWorkflow', {
334+
taskQueue,
335+
workflowId: 'default-versioning-behavior-' + randomUUID(),
336+
});
337+
338+
await wf.result();
339+
340+
// Check history for versioning behavior
341+
const history = await wf.fetchHistory();
342+
343+
const hasPinnedVersioningBehavior = history.events!.some(
344+
(event) =>
345+
event.workflowTaskCompletedEventAttributes &&
346+
event.workflowTaskCompletedEventAttributes.versioningBehavior ===
347+
temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED
348+
);
349+
assert.ok(hasPinnedVersioningBehavior, 'Expected workflow to use pinned versioning behavior');
350+
351+
worker.shutdown();
352+
await workerPromise;
353+
t.pass();
354+
});
355+
240356
async function setRampingVersion(
241357
client: Client,
242358
conflictToken: Uint8Array,

packages/worker/src/worker-options.ts

+3-4
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,10 @@ export interface WorkerOptions {
138138
useWorkerVersioning: boolean;
139139

140140
/**
141-
* If specified, the default versioning behavior to use for all workflows on this worker.
142-
* If not specified, and `useWorkerVersioning` is true, workflows that do not specify a
143-
* versioning behavior via {@link TODO} will cause an error to be thrown on startup.
141+
* The default versioning behavior to use for all workflows on this worker. Specifying a default
142+
* behavior is required,
144143
*/
145-
defaultVersioningBehavior?: VersioningBehavior;
144+
defaultVersioningBehavior: VersioningBehavior;
146145
};
147146

148147
/**

packages/workflow/src/worker-interface.ts

+3
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ export function initRuntime(options: WorkflowCreateOptionsInternal): void {
8282
if (isWorkflowFunctionWithOptions(workflowFn)) {
8383
activator.workflow = workflowFn;
8484
activator.versioningBehavior = workflowFn.options.versioningBehavior;
85+
} else if (isWorkflowFunctionWithOptions(defaultWorkflowFn)) {
86+
activator.workflow = defaultWorkflowFn;
87+
activator.versioningBehavior = defaultWorkflowFn.options.versioningBehavior;
8588
} else if (typeof workflowFn === 'function') {
8689
activator.workflow = workflowFn;
8790
} else if (typeof defaultWorkflowFn === 'function') {

0 commit comments

Comments
 (0)