diff --git a/src/context/auto-executor.ts b/src/context/auto-executor.ts index cfc88c1..4abd55f 100644 --- a/src/context/auto-executor.ts +++ b/src/context/auto-executor.ts @@ -5,6 +5,7 @@ import { LazyCallStep, type BaseLazyStep } from "./steps"; import { getHeaders } from "../workflow-requests"; import type { WorkflowLogger } from "../logger"; import { NO_CONCURRENCY } from "../constants"; +import { QstashError } from "@upstash/qstash"; export class AutoExecutor { private context: WorkflowContext; @@ -227,7 +228,10 @@ export class AutoExecutor { ); await this.submitStepsToQStash([resultStep], [parallelStep]); } catch (error) { - if (error instanceof WorkflowAbort) { + if ( + error instanceof WorkflowAbort || + (error instanceof QstashError && error.status === 400) + ) { throw error; } throw new WorkflowError( diff --git a/src/types.ts b/src/types.ts index 8c11a39..27fbf65 100644 --- a/src/types.ts +++ b/src/types.ts @@ -188,7 +188,7 @@ export type WorkflowServeOptions< failureFunction?: (failureData: { context: Omit< WorkflowContext, - "run" | "sleepUntil" | "sleep" | "call" | "waitForEvent" | "notify" + "run" | "sleepUntil" | "sleep" | "call" | "waitForEvent" | "notify" | "cancel" | "api" >; failStatus: number; failResponse: string; diff --git a/src/workflow-requests.test.ts b/src/workflow-requests.test.ts index 4735d58..bcee9a0 100644 --- a/src/workflow-requests.test.ts +++ b/src/workflow-requests.test.ts @@ -749,6 +749,72 @@ describe("Workflow Requests", () => { } ); + test( + "should omit if triggerRouteFunction (with partial parallel step execution) gets can't publish to canceled workflow error", + async () => { + const workflowRunId = `wfr-${nanoid()}`; + const context = new WorkflowContext({ + qstashClient, + workflowRunId: workflowRunId, + initialPayload: undefined, + headers: new Headers({}) as Headers, + steps: [ + { + stepId: 0, + concurrent: 1, + stepName: "init", + stepType: "Initial", + targetStep: 1, + }, + { + stepId: 0, + concurrent: 2, + stepName: "sleeping", + stepType: "SleepFor", + targetStep: 1, + }, + ], + url: WORKFLOW_ENDPOINT, + }); + + const debug = new WorkflowLogger({ logLevel: "INFO", logOutput: "console" }); + const spy = spyOn(debug, "log"); + + await triggerFirstInvocation(context, 3, false, debug); + expect(spy).toHaveBeenCalledTimes(1); + + await workflowClient.cancel({ ids: [workflowRunId] }); + + const result = await triggerRouteFunction({ + onStep: async () => { + await Promise.all([context.sleep("sleeping", 10), context.sleep("sleeping", 10)]); + }, + onCleanup: async () => { + throw new Error("shouldn't come here."); + }, + onCancel: async () => { + throw new Error("shouldn't come here."); + }, + debug, + }); + + expect(result.isOk()).toBeTrue(); + // @ts-expect-error value will be set since stepFinish isOk + expect(result.value).toBe("workflow-was-finished"); + + expect(spy).toHaveBeenCalledTimes(2); + expect(spy).toHaveBeenLastCalledWith("WARN", "RESPONSE_WORKFLOW", { + message: "tried to append to a cancelled workflow. exiting without publishing.", + name: "QstashError", + errorMessage: + '[{"error":"failed to publish to url: can not append to a a cancelled workflow"}]', + }); + }, + { + timeout: 10000, + } + ); + test( "should omit the error if the workflow is created with the same id", async () => {