Skip to content

Commit 40d129c

Browse files
Introduce RetryableError (#569)
* Introduce RetryableError, allowing users to customize the retry after when needed. This is meant to be supported within `ctx.run` primarily. * Apply feedback: * Removed fields maxRetryAttempts and maxRetryDuration from RetryableError * Make sure uses can handle RetryableError within asTerminalError
1 parent 7add041 commit 40d129c

File tree

15 files changed

+290
-12
lines changed

15 files changed

+290
-12
lines changed

packages/restate-sdk-cloudflare-workers/patches/vm/sdk_shared_core_wasm_bindings.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ export class WasmVM {
100100
notify_input(buffer: Uint8Array): void;
101101
notify_input_closed(): void;
102102
notify_error(error_message: string, stacktrace?: string | null): void;
103+
notify_error_with_delay_override(error_message: string, stacktrace?: string | null, delay_override?: bigint | null): void;
103104
notify_error_for_next_command(error_message: string, stacktrace: string | null | undefined, wasm_command_type: WasmCommandType): void;
104105
notify_error_for_specific_command(error_message: string, stacktrace: string | null | undefined, wasm_command_type: WasmCommandType, command_index: number, command_name?: string | null): void;
105106
take_output(): any;
@@ -129,6 +130,7 @@ export class WasmVM {
129130
propose_run_completion_success(handle: number, buffer: Uint8Array): void;
130131
propose_run_completion_failure(handle: number, value: WasmFailure): void;
131132
propose_run_completion_failure_transient(handle: number, error_message: string, error_stacktrace: string | null | undefined, attempt_duration: bigint, config?: WasmExponentialRetryConfig | null): void;
133+
propose_run_completion_failure_transient_with_delay_override(handle: number, error_message: string, error_stacktrace: string | null | undefined, attempt_duration: bigint, delay_override?: bigint | null, max_retry_attempts_override?: number | null, max_retry_duration_override?: bigint | null): void;
132134
sys_cancel_invocation(target_invocation_id: string): void;
133135
sys_write_output_success(buffer: Uint8Array): void;
134136
sys_write_output_failure(value: WasmFailure): void;

packages/restate-sdk-cloudflare-workers/patches/vm/sdk_shared_core_wasm_bindings_bg.js

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,18 @@ export class WasmVM {
591591
var len1 = WASM_VECTOR_LEN;
592592
wasm.wasmvm_notify_error(this.__wbg_ptr, ptr0, len0, ptr1, len1);
593593
}
594+
/**
595+
* @param {string} error_message
596+
* @param {string | null} [stacktrace]
597+
* @param {bigint | null} [delay_override]
598+
*/
599+
notify_error_with_delay_override(error_message, stacktrace, delay_override) {
600+
const ptr0 = passStringToWasm0(error_message, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
601+
const len0 = WASM_VECTOR_LEN;
602+
var ptr1 = isLikeNone(stacktrace) ? 0 : passStringToWasm0(stacktrace, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
603+
var len1 = WASM_VECTOR_LEN;
604+
wasm.wasmvm_notify_error_with_delay_override(this.__wbg_ptr, ptr0, len0, ptr1, len1, !isLikeNone(delay_override), isLikeNone(delay_override) ? BigInt(0) : delay_override);
605+
}
594606
/**
595607
* @param {string} error_message
596608
* @param {string | null | undefined} stacktrace
@@ -973,6 +985,25 @@ export class WasmVM {
973985
throw takeFromExternrefTable0(ret[0]);
974986
}
975987
}
988+
/**
989+
* @param {number} handle
990+
* @param {string} error_message
991+
* @param {string | null | undefined} error_stacktrace
992+
* @param {bigint} attempt_duration
993+
* @param {bigint | null} [delay_override]
994+
* @param {number | null} [max_retry_attempts_override]
995+
* @param {bigint | null} [max_retry_duration_override]
996+
*/
997+
propose_run_completion_failure_transient_with_delay_override(handle, error_message, error_stacktrace, attempt_duration, delay_override, max_retry_attempts_override, max_retry_duration_override) {
998+
const ptr0 = passStringToWasm0(error_message, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
999+
const len0 = WASM_VECTOR_LEN;
1000+
var ptr1 = isLikeNone(error_stacktrace) ? 0 : passStringToWasm0(error_stacktrace, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
1001+
var len1 = WASM_VECTOR_LEN;
1002+
const ret = wasm.wasmvm_propose_run_completion_failure_transient_with_delay_override(this.__wbg_ptr, handle, ptr0, len0, ptr1, len1, attempt_duration, !isLikeNone(delay_override), isLikeNone(delay_override) ? BigInt(0) : delay_override, isLikeNone(max_retry_attempts_override) ? 0x100000001 : (max_retry_attempts_override) >>> 0, !isLikeNone(max_retry_duration_override), isLikeNone(max_retry_duration_override) ? BigInt(0) : max_retry_duration_override);
1003+
if (ret[1]) {
1004+
throw takeFromExternrefTable0(ret[0]);
1005+
}
1006+
}
9761007
/**
9771008
* @param {string} target_invocation_id
9781009
*/

packages/restate-sdk-cloudflare-workers/patches/vm/sdk_shared_core_wasm_bindings_bg.wasm.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export const wasmvm_get_response_head: (a: number) => number;
1919
export const wasmvm_notify_input: (a: number, b: number, c: number) => void;
2020
export const wasmvm_notify_input_closed: (a: number) => void;
2121
export const wasmvm_notify_error: (a: number, b: number, c: number, d: number, e: number) => void;
22+
export const wasmvm_notify_error_with_delay_override: (a: number, b: number, c: number, d: number, e: number, f: number, g: bigint) => void;
2223
export const wasmvm_notify_error_for_next_command: (a: number, b: number, c: number, d: number, e: number, f: number) => void;
2324
export const wasmvm_notify_error_for_specific_command: (a: number, b: number, c: number, d: number, e: number, f: number, g: number, h: number, i: number) => void;
2425
export const wasmvm_take_output: (a: number) => any;
@@ -48,6 +49,7 @@ export const wasmvm_sys_run: (a: number, b: number, c: number) => [number, numbe
4849
export const wasmvm_propose_run_completion_success: (a: number, b: number, c: number, d: number) => [number, number];
4950
export const wasmvm_propose_run_completion_failure: (a: number, b: number, c: any) => [number, number];
5051
export const wasmvm_propose_run_completion_failure_transient: (a: number, b: number, c: number, d: number, e: number, f: number, g: bigint, h: number) => [number, number];
52+
export const wasmvm_propose_run_completion_failure_transient_with_delay_override: (a: number, b: number, c: number, d: number, e: number, f: number, g: bigint, h: number, i: bigint, j: number, k: number, l: bigint) => [number, number];
5153
export const wasmvm_sys_cancel_invocation: (a: number, b: number, c: number) => [number, number];
5254
export const wasmvm_sys_write_output_success: (a: number, b: number, c: number) => [number, number];
5355
export const wasmvm_sys_write_output_failure: (a: number, b: any) => [number, number];

packages/restate-sdk-examples/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
"verify": "npm run format-check && npm run lint && npm run build",
3030
"release": "",
3131
"object": "RESTATE_LOGGING=debug tsx ./src/object.ts",
32-
"greeter": "RESTATE_LOGGING=debug tsx ./src/greeter_with_options.ts",
32+
"greeter": "RESTATE_LOGGING=debug tsx ./src/greeter.ts",
3333
"zgreeter": "RESTATE_LOGGING=debug tsx ./src/zod_greeter.ts",
3434
"workflow": "RESTATE_LOGGING=debug tsx ./src/workflow.ts",
3535
"workflow_client": "RESTATE_LOGGING=debug tsx ./src/workflow_client.ts",

packages/restate-sdk/src/common_api.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ export {
108108
RestateError,
109109
TerminalError,
110110
TimeoutError,
111+
RetryableError,
112+
type RetryableErrorOptions,
111113
} from "./types/errors.js";
112114
export type {
113115
LoggerTransport,

packages/restate-sdk/src/context.ts

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -313,11 +313,22 @@ export interface Context extends RestateContext {
313313
* re-executed on replay (the latest, if the failure happened in the small windows
314314
* described above).
315315
*
316+
* You can customize retry options by either:
317+
*
318+
* - Providing retry policy options in {@link RunOptions}
319+
* - Throwing {@link RetryableError}, providing `retryAfter` option. This can be especially useful when interacting with HTTP requests returning the `Retry-After` header. You can combine the usage of throwing {@link RetryableError} with the `maxRetryAttempts`/`maxRetryDuration` from {@link RunOptions}.
320+
*
316321
* @example
317322
* ```ts
318323
* const result = await ctx.run(someExternalAction)
319324
*```
320-
325+
*
326+
* @example
327+
* ```ts
328+
* // Add some retry options
329+
* const result = await ctx.run("my action", someExternalAction, { maxRetryAttempts: 10 })
330+
* ```
331+
*
321332
* @example
322333
* ```ts
323334
* await ctx.run("payment action", async () => {
@@ -328,27 +339,41 @@ export interface Context extends RestateContext {
328339
* } else if (result.paymentGatewayBusy) {
329340
* // restate will retry automatically
330341
* // to bound retries, use RunOptions
331-
* throw new Exception("Payment gateway busy");
342+
* throw new Error("Payment gateway busy");
332343
* } else {
333344
* // success!
334345
* }
335346
* });
336347
*
348+
* @example
349+
* ```ts
350+
* await ctx.run("payment action", async () => {
351+
* const res = fetch(...);
352+
* if (!res.ok) {
353+
* // Read Retry-After header
354+
* const retryAfterHeader = res.headers['Retry-After']
355+
*
356+
* // Use RetryableError to customize in how long to retry
357+
* throw RetryableError.from(cause, { retryAfter: { seconds: retryAfterHeader } })
358+
* }
359+
* }, {
360+
* // Retry at most ten times
361+
* maxRetryAttempts: 10
362+
* });
337363
* ```
338364
*
339365
* @param action The function to run.
340366
*/
341367
run<T>(action: RunAction<T>): RestatePromise<T>;
342368

343369
/**
344-
* Run an operation and store the result in Restate. The operation will thus not
345-
* be re-run during a later replay, but take the durable result from Restate.
346-
*
347-
* @param name the action's name
348-
* @param action the action to run.
370+
* Same as {@link run}, but providing a name, used for observability purposes.
349371
*/
350372
run<T>(name: string, action: RunAction<T>): RestatePromise<T>;
351373

374+
/**
375+
* See {@link run}
376+
*/
352377
run<T>(
353378
name: string,
354379
action: RunAction<T>,

packages/restate-sdk/src/context_impl.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import {
3838
INTERNAL_ERROR_CODE,
3939
logError,
4040
RestateError,
41+
RetryableError,
4142
TerminalError,
4243
UNKNOWN_ERROR_CODE,
4344
} from "./types/errors.js";
@@ -436,6 +437,22 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
436437
code: err.code,
437438
message: err.message,
438439
});
440+
} else if (err instanceof RetryableError) {
441+
const maxRetryDuration =
442+
options?.maxRetryDuration ?? options?.maxRetryDurationMillis;
443+
this.coreVm.propose_run_completion_failure_transient_with_delay_override(
444+
handle,
445+
err.message,
446+
err.stack,
447+
BigInt(attemptDuration),
448+
err.retryAfter !== undefined
449+
? BigInt(millisOrDurationToMillis(err.retryAfter))
450+
: undefined,
451+
options?.maxRetryAttempts,
452+
maxRetryDuration !== undefined
453+
? BigInt(millisOrDurationToMillis(maxRetryDuration))
454+
: undefined
455+
);
439456
} else {
440457
this.vmLogger.warn(
441458
`Error when processing ctx.run '${name}'.\n`,

packages/restate-sdk/src/endpoint/handlers/generic.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
ensureError,
1414
logError,
1515
RestateError,
16+
RetryableError,
1617
TerminalError,
1718
} from "../../types/errors.js";
1819
import type { ProtocolMode } from "../../types/discovery.js";
@@ -37,6 +38,7 @@ import {
3738
LogSource,
3839
RestateLogLevel,
3940
} from "../../logging/logger_transport.js";
41+
import { millisOrDurationToMillis } from "@restatedev/restate-sdk-core";
4042

4143
export interface Headers {
4244
[name: string]: string | string[] | undefined;
@@ -389,6 +391,14 @@ export class GenericHandler implements RestateHandler {
389391
message: error.message,
390392
});
391393
coreVm.sys_end();
394+
} else if (error instanceof RetryableError) {
395+
coreVm.notify_error_with_delay_override(
396+
error.message,
397+
error.stack,
398+
error.retryAfter !== undefined
399+
? BigInt(millisOrDurationToMillis(error.retryAfter))
400+
: undefined
401+
);
392402
} else {
393403
coreVm.notify_error(error.message, error.stack);
394404
}

packages/restate-sdk/src/endpoint/handlers/vm/sdk_shared_core_wasm_bindings.d.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@ export class WasmVM {
111111
notify_input(buffer: Uint8Array): void;
112112
notify_input_closed(): void;
113113
notify_error(error_message: string, stacktrace?: string | null): void;
114+
notify_error_with_delay_override(
115+
error_message: string,
116+
stacktrace?: string | null,
117+
delay_override?: bigint | null
118+
): void;
114119
notify_error_for_next_command(
115120
error_message: string,
116121
stacktrace: string | null | undefined,
@@ -171,6 +176,15 @@ export class WasmVM {
171176
attempt_duration: bigint,
172177
config?: WasmExponentialRetryConfig | null
173178
): void;
179+
propose_run_completion_failure_transient_with_delay_override(
180+
handle: number,
181+
error_message: string,
182+
error_stacktrace: string | null | undefined,
183+
attempt_duration: bigint,
184+
delay_override?: bigint | null,
185+
max_retry_attempts_override?: number | null,
186+
max_retry_duration_override?: bigint | null
187+
): void;
174188
sys_cancel_invocation(target_invocation_id: string): void;
175189
sys_write_output_success(buffer: Uint8Array): void;
176190
sys_write_output_failure(value: WasmFailure): void;

0 commit comments

Comments
 (0)