Skip to content

Commit

Permalink
Ignore wf canceled error in partial parallel execution (#50)
Browse files Browse the repository at this point in the history
* fix: ignore wf canceled error in partial parallel execution

* fix: disallow cancel and api in failure function
  • Loading branch information
CahidArda authored Jan 3, 2025
1 parent c534220 commit 53c3fc4
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 2 deletions.
6 changes: 5 additions & 1 deletion src/context/auto-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { LazyCallStep, type BaseLazyStep } from "./steps";
import { getHeaders } from "../workflow-requests";
import type { WorkflowLogger } from "../logger";
import { NO_CONCURRENCY } from "../constants";
import { QstashError } from "@upstash/qstash";

export class AutoExecutor {
private context: WorkflowContext;
Expand Down Expand Up @@ -227,7 +228,10 @@ export class AutoExecutor {
);
await this.submitStepsToQStash([resultStep], [parallelStep]);
} catch (error) {
if (error instanceof WorkflowAbort) {
if (
error instanceof WorkflowAbort ||
(error instanceof QstashError && error.status === 400)
) {
throw error;
}
throw new WorkflowError(
Expand Down
2 changes: 1 addition & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ export type WorkflowServeOptions<
failureFunction?: (failureData: {
context: Omit<
WorkflowContext<TInitialPayload>,
"run" | "sleepUntil" | "sleep" | "call" | "waitForEvent" | "notify"
"run" | "sleepUntil" | "sleep" | "call" | "waitForEvent" | "notify" | "cancel" | "api"
>;
failStatus: number;
failResponse: string;
Expand Down
66 changes: 66 additions & 0 deletions src/workflow-requests.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,72 @@ describe("Workflow Requests", () => {
}
);

test(
"should omit if triggerRouteFunction (with partial parallel step execution) gets can't publish to canceled workflow error",
async () => {
const workflowRunId = `wfr-${nanoid()}`;
const context = new WorkflowContext({
qstashClient,
workflowRunId: workflowRunId,
initialPayload: undefined,
headers: new Headers({}) as Headers,
steps: [
{
stepId: 0,
concurrent: 1,
stepName: "init",
stepType: "Initial",
targetStep: 1,
},
{
stepId: 0,
concurrent: 2,
stepName: "sleeping",
stepType: "SleepFor",
targetStep: 1,
},
],
url: WORKFLOW_ENDPOINT,
});

const debug = new WorkflowLogger({ logLevel: "INFO", logOutput: "console" });
const spy = spyOn(debug, "log");

await triggerFirstInvocation(context, 3, false, debug);
expect(spy).toHaveBeenCalledTimes(1);

await workflowClient.cancel({ ids: [workflowRunId] });

const result = await triggerRouteFunction({
onStep: async () => {
await Promise.all([context.sleep("sleeping", 10), context.sleep("sleeping", 10)]);
},
onCleanup: async () => {
throw new Error("shouldn't come here.");
},
onCancel: async () => {
throw new Error("shouldn't come here.");
},
debug,
});

expect(result.isOk()).toBeTrue();
// @ts-expect-error value will be set since stepFinish isOk
expect(result.value).toBe("workflow-was-finished");

expect(spy).toHaveBeenCalledTimes(2);
expect(spy).toHaveBeenLastCalledWith("WARN", "RESPONSE_WORKFLOW", {
message: "tried to append to a cancelled workflow. exiting without publishing.",
name: "QstashError",
errorMessage:
'[{"error":"failed to publish to url: can not append to a a cancelled workflow"}]',
});
},
{
timeout: 10000,
}
);

test(
"should omit the error if the workflow is created with the same id",
async () => {
Expand Down

0 comments on commit 53c3fc4

Please sign in to comment.