Skip to content

User metadata #1657

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions packages/client/src/schedule-helpers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import Long from 'long'; // eslint-disable-line import/no-named-as-default
import { compileRetryPolicy, decompileRetryPolicy, extractWorkflowType, LoadedDataConverter } from '@temporalio/common';
import {
compileRetryPolicy,
decompileRetryPolicy,
extractWorkflowType,
JsonPayloadConverter,
LoadedDataConverter,
} from '@temporalio/common';
import {
encodeUnifiedSearchAttributes,
decodeSearchAttributes,
Expand Down Expand Up @@ -189,8 +195,7 @@ export function decodeOptionalStructuredCalendarSpecs(
}

export function compileScheduleOptions(options: ScheduleOptions): CompiledScheduleOptions {
const workflowTypeOrFunc = options.action.workflowType;
const workflowType = extractWorkflowType(workflowTypeOrFunc);
const workflowType = extractWorkflowType(options.action.workflowType);
return {
...options,
action: {
Expand Down Expand Up @@ -240,6 +245,7 @@ export async function encodeScheduleAction(
action: CompiledScheduleAction,
headers: Headers
): Promise<temporal.api.schedule.v1.IScheduleAction> {
const jsonConverter = new JsonPayloadConverter();
return {
startWorkflow: {
workflowId: action.workflowId,
Expand All @@ -263,6 +269,10 @@ export async function encodeScheduleAction(
}
: undefined,
header: { fields: headers },
userMetadata: {
summary: jsonConverter.toPayload(action.staticSummary),
details: jsonConverter.toPayload(action.staticDetails),
},
},
};
}
Expand Down Expand Up @@ -312,6 +322,8 @@ export async function decodeScheduleAction(
pb: temporal.api.schedule.v1.IScheduleAction
): Promise<ScheduleDescriptionAction> {
if (pb.startWorkflow) {
const jsonConverter = new JsonPayloadConverter();
const userMetadata = pb.startWorkflow?.userMetadata;
return {
type: 'startWorkflow',
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
Expand All @@ -328,6 +340,8 @@ export async function decodeScheduleAction(
workflowExecutionTimeout: optionalTsToMs(pb.startWorkflow.workflowExecutionTimeout),
workflowRunTimeout: optionalTsToMs(pb.startWorkflow.workflowRunTimeout),
workflowTaskTimeout: optionalTsToMs(pb.startWorkflow.workflowTaskTimeout),
staticSummary: userMetadata?.summary ? jsonConverter.fromPayload(userMetadata.summary) : undefined,
staticDetails: userMetadata?.details ? jsonConverter.fromPayload(userMetadata.details) : undefined,
};
}
throw new TypeError('Unsupported schedule action');
Expand Down
4 changes: 4 additions & 0 deletions packages/client/src/schedule-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,8 @@ export type ScheduleOptionsStartWorkflowAction<W extends Workflow> = {
| 'workflowExecutionTimeout'
| 'workflowRunTimeout'
| 'workflowTaskTimeout'
| 'staticDetails'
| 'staticSummary'
> & {
/**
* Workflow id to use when starting. Assign a meaningful business id.
Expand Down Expand Up @@ -815,6 +817,8 @@ export type ScheduleDescriptionStartWorkflowAction = ScheduleSummaryStartWorkflo
| 'workflowExecutionTimeout'
| 'workflowRunTimeout'
| 'workflowTaskTimeout'
| 'staticSummary'
| 'staticDetails'
>;

// Invariant: an existing ScheduleDescriptionAction can be used as is to create or update a schedule
Expand Down
2 changes: 2 additions & 0 deletions packages/client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ export interface CountWorkflowExecution {
export type WorkflowExecutionDescription = Replace<
WorkflowExecutionInfo,
{
staticSummary?: string;
staticDetails?: string;
raw: DescribeWorkflowExecutionResponse;
}
>;
Expand Down
18 changes: 16 additions & 2 deletions packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
decodeRetryState,
encodeWorkflowIdConflictPolicy,
WorkflowIdConflictPolicy,
JsonPayloadConverter,
} from '@temporalio/common';
import { encodeUnifiedSearchAttributes } from '@temporalio/common/lib/converter/payload-search-attributes';
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
Expand Down Expand Up @@ -510,7 +511,7 @@ export class WorkflowClient extends BaseClient {

protected async _start<T extends Workflow>(
workflowTypeOrFunc: string | T,
options: WithWorkflowArgs<T, WorkflowOptions>,
options: WorkflowStartOptions<T>,
interceptors: WorkflowClientInterceptor[]
): Promise<string> {
const workflowType = extractWorkflowType(workflowTypeOrFunc);
Expand Down Expand Up @@ -1196,6 +1197,7 @@ export class WorkflowClient extends BaseClient {
protected async _signalWithStartWorkflowHandler(input: WorkflowSignalWithStartInput): Promise<string> {
const { identity } = this.options;
const { options, workflowType, signalName, signalArgs, headers } = input;
const jsonConverter = new JsonPayloadConverter();
const req: temporal.api.workflowservice.v1.ISignalWithStartWorkflowExecutionRequest = {
namespace: this.options.namespace,
identity,
Expand Down Expand Up @@ -1225,6 +1227,10 @@ export class WorkflowClient extends BaseClient {
: undefined,
cronSchedule: options.cronSchedule,
header: { fields: headers },
userMetadata: {
summary: jsonConverter.toPayload(options?.staticSummary),
details: jsonConverter.toPayload(options?.staticDetails),
},
};
try {
return (await this.workflowService.signalWithStartWorkflowExecution(req)).runId;
Expand Down Expand Up @@ -1265,7 +1271,7 @@ export class WorkflowClient extends BaseClient {
protected async createStartWorkflowRequest(input: WorkflowStartInput): Promise<StartWorkflowExecutionRequest> {
const { options: opts, workflowType, headers } = input;
const { identity, namespace } = this.options;

const jsonConverter = new JsonPayloadConverter();
return {
namespace,
identity,
Expand Down Expand Up @@ -1293,6 +1299,10 @@ export class WorkflowClient extends BaseClient {
: undefined,
cronSchedule: opts.cronSchedule,
header: { fields: headers },
userMetadata: {
summary: jsonConverter.toPayload(opts?.staticSummary),
details: jsonConverter.toPayload(opts?.staticDetails),
},
};
}

Expand Down Expand Up @@ -1426,8 +1436,12 @@ export class WorkflowClient extends BaseClient {
workflowExecution: { workflowId, runId },
});
const info = await executionInfoFromRaw(raw.workflowExecutionInfo ?? {}, this.client.dataConverter, raw);
const jsonConverter = new JsonPayloadConverter();
const userMetadata = raw.executionConfig?.userMetadata;
return {
...info,
staticDetails: userMetadata?.details ? jsonConverter.fromPayload(userMetadata.details) : undefined,
staticSummary: userMetadata?.summary ? jsonConverter.fromPayload(userMetadata.summary) : undefined,
raw,
};
},
Expand Down
15 changes: 15 additions & 0 deletions packages/common/src/workflow-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,21 @@ export interface BaseWorkflowOptions {
* by {@link typedSearchAttributes}.
*/
typedSearchAttributes?: SearchAttributePair[] | TypedSearchAttributes;

/**
* General fixed details for this workflow execution that may appear in UI/CLI.
* This can be in Temporal markdown format and can span multiple lines.
*
* @experimental
*/
staticDetails?: string;
/**
* A single-line fixed summary for this workflow execution that may appear in the UI/CLI.
* This can be in single-line Temporal markdown format.
*
* @experimental
*/
staticSummary?: string;
}

export type WithWorkflowArgs<W extends Workflow, T> = T &
Expand Down
77 changes: 76 additions & 1 deletion packages/test/src/test-integration-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@ import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
import { TestWorkflowEnvironment } from '@temporalio/testing';
import { CancelReason } from '@temporalio/worker/lib/activity';
import * as workflow from '@temporalio/workflow';
import { defineQuery, defineSignal } from '@temporalio/workflow';
import { defineQuery, defineSignal, setHandler } from '@temporalio/workflow';
import { SdkFlags } from '@temporalio/workflow/lib/flags';
import {
ActivityCancellationType,
ApplicationFailure,
defineSearchAttributeKey,
JsonPayloadConverter,
SearchAttributePair,
SearchAttributeType,
TypedSearchAttributes,
WorkflowExecutionAlreadyStartedError,
} from '@temporalio/common';
import { temporal } from '@temporalio/proto';
import { signalSchedulingWorkflow } from './activities/helpers';
import { activityStartedSignal } from './workflows/definitions';
import * as workflows from './workflows';
Expand Down Expand Up @@ -1337,3 +1339,76 @@ test('can register search attributes to dev server', async (t) => {
t.deepEqual(desc.searchAttributes, { 'new-search-attr': [12] }); // eslint-disable-line deprecation/deprecation
await env.teardown();
});

export async function userMetadataWorkflow(): Promise<string> {
let done = false;
const signalDef = defineSignal('done');
setHandler(signalDef, () => {
done = true;
});

// That workflow should call an activity (with summary)
const { activityWithSummary } = workflow.proxyActivities({ scheduleToCloseTimeout: '10s' }).withSummaries({
activityWithSummary: 'activity summary',
});
await activityWithSummary();
// Should have a timer (with summary)
await workflow.sleep(5, 'timer summary');
// Set current details
workflow.setCurrentDetails('current wf details');
// Unblock on var -> query current details (or return)
await workflow.condition(() => done);
return workflow.getCurrentDetails();
}

test('User metadata on workflow, timer, activity', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker({
activities: {
async activityWithSummary() {},
},
});

await worker.runUntil(async () => {
// Start a workflow with static details
const handle = await startWorkflow(userMetadataWorkflow, {
staticSummary: 'wf static summary',
staticDetails: 'wf static details',
});
// Describe workflow -> static summary, static details
const desc = await handle.describe();
t.true(desc.staticSummary === 'wf static summary');
t.true(desc.staticDetails === 'wf static details');

await handle.signal('done');
const res = await handle.result();
t.true(res === 'current wf details');

// Get history events for timer and activity summaries.
const resp = await t.context.env.client.workflowService.getWorkflowExecutionHistory({
namespace: t.context.env.client.options.namespace,
execution: {
workflowId: handle.workflowId,
runId: handle.firstExecutionRunId,
},
});
const jsonConverter = new JsonPayloadConverter();
for (const event of resp.history?.events ?? []) {
if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) {
t.deepEqual(jsonConverter.fromPayload(event.userMetadata?.summary ?? {}), 'wf static summary');
t.deepEqual(jsonConverter.fromPayload(event.userMetadata?.details ?? {}), 'wf static details');
} else if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED) {
t.deepEqual(jsonConverter.fromPayload(event.userMetadata?.summary ?? {}), 'activity summary');
} else if (event.eventType === temporal.api.enums.v1.EventType.EVENT_TYPE_TIMER_STARTED) {
t.deepEqual(jsonConverter.fromPayload(event.userMetadata?.summary ?? {}), 'timer summary');
}
}

// Run metadata query -> get current details
const wfMetadata = (await handle.query('__temporal_workflow_metadata')) as temporal.api.sdk.v1.IWorkflowMetadata;
t.deepEqual(wfMetadata.definition?.signalDefinitions?.length, 1);
t.deepEqual(wfMetadata.definition?.signalDefinitions?.[0].name, 'done');
t.deepEqual(wfMetadata.definition?.queryDefinitions?.length, 3); // default queries
t.deepEqual(wfMetadata.currentDetails, 'current wf details');
});
});
27 changes: 27 additions & 0 deletions packages/test/src/test-schedules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -881,4 +881,31 @@ if (RUN_INTEGRATION_TESTS) {
await handle.delete();
}
});

test.serial('User metadata on schedule', async (t) => {
const { client } = t.context;
const scheduleId = `schedule-with-user-metadata-${randomUUID()}`;
const handle = await client.schedule.create({
scheduleId,
spec: {},
action: {
type: 'startWorkflow',
workflowType: dummyWorkflow,
taskQueue,
staticSummary: 'schedule static summary',
staticDetails: 'schedule static details',
},
});

try {
const describedSchedule = await handle.describe();
t.deepEqual(describedSchedule.spec.calendars, []);
t.deepEqual(describedSchedule.spec.intervals, []);
t.deepEqual(describedSchedule.spec.skip, []);
t.deepEqual(describedSchedule.action.staticSummary, 'schedule static summary');
t.deepEqual(describedSchedule.action.staticDetails, 'schedule static details');
} finally {
await handle.delete();
}
});
}
25 changes: 25 additions & 0 deletions packages/workflow/src/interceptors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export interface ActivityInput {
readonly options: ActivityOptions;
readonly headers: Headers;
readonly seq: number;
readonly cmdOpts?: WorkflowCommandOptions;
}

/** Input for WorkflowOutboundCallsInterceptor.scheduleLocalActivity */
Expand All @@ -91,6 +92,7 @@ export interface LocalActivityInput {
readonly seq: number;
readonly originalScheduleTime?: Timestamp;
readonly attempt: number;
readonly cmdOpts?: WorkflowCommandOptions;
}

/** Input for WorkflowOutboundCallsInterceptor.startChildWorkflowExecution */
Expand All @@ -101,10 +103,33 @@ export interface StartChildWorkflowExecutionInput {
readonly seq: number;
}

/**
* User metadata that can be attached to workflow commands.
*
* Current used for:
* - startTimer, scheduleActivity/scheduleLocalActivity commands
* - internal metadata query
*/
export interface UserMetadata {
/** @experimental A single line summary of the command's purpose */
summary?: string;
/** @experimental Additional details about the command for longer-text description, can span multiple lines */
details?: string;
}

/**
* Options that can be attached to workflow commands.
*/
export interface WorkflowCommandOptions {
/** User metadata for the command that may be persisted to history */
readonly userMetadata?: UserMetadata;
}

/** Input for WorkflowOutboundCallsInterceptor.startTimer */
export interface TimerInput {
readonly durationMs: number;
readonly seq: number;
readonly cmdOpts?: WorkflowCommandOptions;
}

/**
Expand Down
3 changes: 3 additions & 0 deletions packages/workflow/src/internals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ export class Activator implements ActivationHandler {
signalDefinitions,
updateDefinitions,
},
currentDetails: this.currentDetails,
};
},
description: 'Returns metadata associated with this workflow.',
Expand Down Expand Up @@ -416,6 +417,8 @@ export class Activator implements ActivationHandler {

public readonly registeredActivityNames: Set<string>;

public currentDetails: string = '';

constructor({
info,
now,
Expand Down
Loading
Loading