Skip to content

Commit

Permalink
Merge pull request #150 from upstash/DX-999-fixes
Browse files Browse the repository at this point in the history
Workflow: Feedback & Failure Body
  • Loading branch information
CahidArda authored Aug 16, 2024
2 parents 7635624 + c735a7c commit 07b25b9
Show file tree
Hide file tree
Showing 30 changed files with 271 additions and 122 deletions.
2 changes: 1 addition & 1 deletion examples/workflow/bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ project_arg="$1"
path_arg="$2"

# Start ngrok and capture the public URL
ngrok http localhost:3000 --log=stdout > ngrok.log &
ngrok http localhost:3001 --log=stdout > ngrok.log &
NGROK_PID=$!
sleep 5 # Allow some time for ngrok to start

Expand Down
4 changes: 2 additions & 2 deletions examples/workflow/nextjs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
"version": "0.1.0",
"private": true,
"scripts": {
"dev": "next dev",
"dev": "next dev -p 3001",
"build": "next build",
"start": "next start",
"start": "next start -p 3001",
"lint": "next lint"
},
"dependencies": {
Expand Down
2 changes: 1 addition & 1 deletion examples/workflow/nuxt/nuxt.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ export default defineNuxtConfig({
},
},
devServer: {
port: 3000
port: 3001
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export const POST = serve<Invoice>({
return true
})
},
client: new Client({
qstashClient: new Client({
baseUrl: env.QSTASH_URL!,
token: env.QSTASH_TOKEN!,
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export const POST = serve<Invoice>({
return true
})
},
client: new Client({
qstashClient: new Client({
baseUrl: env.QSTASH_URL!,
token: env.QSTASH_TOKEN!,
}),
Expand Down
2 changes: 1 addition & 1 deletion examples/workflow/sveltekit/src/routes/path/+server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export const POST = serve<string>({
console.log("step 2 input", result1, "output", output)
});
},
client: new Client({
qstashClient: new Client({
baseUrl: env.QSTASH_URL!,
token: env.QSTASH_TOKEN!,
}),
Expand Down
2 changes: 1 addition & 1 deletion examples/workflow/sveltekit/src/routes/sleep/+server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export const POST = serve<string>({
console.log("step 3 input", result2, "output", output)
});
},
client: new Client({
qstashClient: new Client({
baseUrl: env.QSTASH_URL!,
token: env.QSTASH_TOKEN!,
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export const POST = serve<Invoice>({
return true
})
},
client: new Client({
qstashClient: new Client({
baseUrl: env.QSTASH_URL!,
token: env.QSTASH_TOKEN!,
}),
Expand Down
2 changes: 1 addition & 1 deletion examples/workflow/sveltekit/vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ export default defineConfig({
preserveSymlinks: true,
},
server: {
port: 3000,
port: 3001,
},
});
15 changes: 13 additions & 2 deletions platforms/nextjs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Receiver } from "../src/receiver";

import type { WorkflowServeParameters } from "../src/client/workflow";
import { serve as serveBase } from "../src/client/workflow";
import { formatWorkflowError } from "../src/client/error";

export type VerifySignatureConfig = {
currentSigningKey?: string;
Expand Down Expand Up @@ -198,15 +199,25 @@ export function verifySignatureAppRouter(
export const serve = <TInitialPayload = unknown>({
routeFunction,
options,
}: WorkflowServeParameters<TInitialPayload, NextResponse>): ((
}: WorkflowServeParameters<TInitialPayload, NextResponse, "onStepFinish">): ((
request: NextRequest
) => Promise<NextResponse>) => {
return serveBase<TInitialPayload, NextRequest, NextResponse>({
const handler = serveBase<TInitialPayload, NextRequest, NextResponse>({
routeFunction,
options: {
onStepFinish: (workflowRunId: string) =>
new NextResponse(JSON.stringify({ workflowRunId }), { status: 200 }),
...options,
},
});

return async (request: NextRequest) => {
try {
return await handler(request);
} catch (error) {
return new NextResponse(JSON.stringify(formatWorkflowError(error)), {
status: 500,
});
}
};
};
22 changes: 6 additions & 16 deletions platforms/nuxt.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { H3Event } from "h3";
import { defineEventHandler, getHeader, readRawBody } from "h3";
import { Receiver } from "../src";
import { formatWorkflowError, Receiver } from "../src";

import type { WorkflowServeParameters } from "../src/client/workflow";
import { serve as serveBase } from "../src/client/workflow";
Expand Down Expand Up @@ -70,7 +70,7 @@ function transformHeaders(headers: IncomingHttpHeaders): [string, string][] {
export const serve = <TInitialPayload = unknown>({
routeFunction,
options,
}: WorkflowServeParameters<TInitialPayload, string>) => {
}: WorkflowServeParameters<TInitialPayload, Response, "onStepFinish">) => {
const handler = defineEventHandler(async (event) => {
const method = event.node.req.method;
if (method?.toUpperCase() !== "POST") {
Expand All @@ -92,24 +92,14 @@ export const serve = <TInitialPayload = unknown>({
method: "POST",
});

const serveHandler = serveBase<TInitialPayload, Request, string>({
const serveHandler = serveBase<TInitialPayload>({
routeFunction,
options: {
onStepFinish: (workflowRunId: string) => workflowRunId,
...options,
},
options,
});
try {
const workflowRunId = await serveHandler(request);
return {
status: 200,
body: { workflowRunId },
};
return await serveHandler(request);
} catch (error) {
return {
status: 500,
body: `Error running the workflow at URL '${url}'. Got error: ${error}`,
};
return new Response(JSON.stringify(formatWorkflowError(error)), { status: 500 });
}
});
return handler;
Expand Down
11 changes: 8 additions & 3 deletions platforms/solidjs.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { APIEvent, APIHandler } from "@solidjs/start/server";
import { Receiver } from "../src";
import { formatWorkflowError, Receiver } from "../src";

import type { WorkflowServeParameters } from "../src/client/workflow";
import { serve as serveBase } from "../src/client/workflow";
Expand Down Expand Up @@ -51,7 +51,7 @@ export const verifySignatureSolidjs = (
export const serve = <TInitialPayload = unknown>({
routeFunction,
options,
}: WorkflowServeParameters<TInitialPayload>) => {
}: WorkflowServeParameters<TInitialPayload, Response, "onStepFinish">) => {
// Create a handler which receives an event and calls the
// serveBase method
const handler = async (event: APIEvent) => {
Expand All @@ -68,7 +68,12 @@ export const serve = <TInitialPayload = unknown>({
});

// invoke serve handler and return result
return serveHandler(event.request);
try {
const result = await serveHandler(event.request);
return result;
} catch (error) {
return new Response(JSON.stringify(formatWorkflowError(error)), { status: 500 });
}
};
return handler;
};
16 changes: 10 additions & 6 deletions platforms/svelte.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { RequestHandler } from "@sveltejs/kit";
import { Receiver } from "../src";
import { formatWorkflowError, Receiver } from "../src";

import type { WorkflowServeParametersExtended } from "../src/client/workflow";
import { serve as serveBase } from "../src/client/workflow";
Expand Down Expand Up @@ -56,18 +56,22 @@ export const serve = <TInitialPayload = unknown>({
routeFunction,
options,
receiver,
client,
}: WorkflowServeParametersExtended<TInitialPayload>): RequestHandler => {
const handler: RequestHandler = ({ request }) => {
qstashClient,
}: WorkflowServeParametersExtended<TInitialPayload, Response, "onStepFinish">): RequestHandler => {
const handler: RequestHandler = async ({ request }) => {
const serveMethod = serveBase<TInitialPayload>({
routeFunction,
options: {
client,
qstashClient,
receiver,
...options,
},
});
return serveMethod(request);
try {
return await serveMethod(request);
} catch (error) {
return new Response(JSON.stringify(formatWorkflowError(error)), { status: 500 });
}
};

return handler;
Expand Down
10 changes: 10 additions & 0 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { Schedules } from "./schedules";
import type { BodyInit, Event, GetEventsPayload, HeadersInit, HTTPMethods, State } from "./types";
import { UrlGroups } from "./url-groups";
import { getRequestPath, prefixHeaders, processHeaders } from "./utils";
import { Workflow } from "./workflow";

type ClientConfig = {
/**
Expand Down Expand Up @@ -308,6 +309,15 @@ export class Client {
return new Schedules(this.http);
}

/**
* Access the workflow API.
*
* cancel workflows.
*/
public get workflow(): Workflow {
return new Workflow(this.http);
}

/**
* Access the queue API.
*
Expand Down
21 changes: 20 additions & 1 deletion src/client/error.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { ChatRateLimit, RateLimit } from "./types";
import type { Step } from "./workflow/types";
import type { FailureFunctionPayload, Step } from "./workflow/types";

/**
* Result of 500 Internal Server Error
Expand Down Expand Up @@ -82,3 +82,22 @@ export class QstashWorkflowAbort extends Error {
this.stepInfo = stepInfo;
}
}

/**
* Formats an unknown error to match the FailureFunctionPayload format
*
* @param error
* @returns
*/
export const formatWorkflowError = (error: unknown): FailureFunctionPayload => {
return error instanceof Error
? {
error: error.name,
message: error.message,
stack: error.stack,
}
: {
error: "Error",
message: "An error occured while executing workflow.",
};
};
33 changes: 33 additions & 0 deletions src/client/workflow.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import { describe, test, expect } from "bun:test";
import { triggerFirstInvocation } from "./workflow/workflow-requests";
import { WorkflowContext } from "./workflow/context";
import { nanoid } from "nanoid";
import { Client } from "./client";
import { QstashError } from "./error";

describe("workflow tests", () => {
const qstashClient = new Client({ token: process.env.QSTASH_TOKEN! });
test("should delete workflow succesfully", async () => {
const workflowRunId = `wfr-${nanoid()}`;
const result = await triggerFirstInvocation(
new WorkflowContext({
qstashClient,
workflowRunId,
headers: new Headers({}) as Headers,
steps: [],
url: "https://some-url.com",
initialPayload: undefined,
})
);
expect(result.isOk()).toBeTrue();

const cancelResult = await qstashClient.workflow.cancel(workflowRunId);
expect(cancelResult).toBeTrue();

const throws = qstashClient.workflow.cancel(workflowRunId);
expect(throws).rejects.toThrow(
new QstashError(`{"error":"workflowRun ${workflowRunId} not found"}`)
);
});
});
2 changes: 1 addition & 1 deletion src/client/workflow/auto-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ describe("auto-executor", () => {

const getContext = (steps: Step[]) => {
return new SpyWorkflowContext({
client: new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token }),
qstashClient: new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token }),
workflowRunId,
initialPayload,
headers: new Headers({}) as Headers,
Expand Down
16 changes: 9 additions & 7 deletions src/client/workflow/auto-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ export class AutoExecutor {
private debug?: WorkflowLogger;

private readonly nonPlanStepCount: number;
private readonly steps: Step[];
private indexInCurrentList = 0;
public stepCount = 0;
public planStepCount = 0;

protected executingStep: string | false = false;

constructor(context: WorkflowContext, debug?: WorkflowLogger) {
constructor(context: WorkflowContext, steps: Step[], debug?: WorkflowLogger) {
this.context = context;
this.debug = debug;
this.nonPlanStepCount = this.context.steps.filter((step) => !step.targetStep).length;
this.steps = steps;
this.nonPlanStepCount = this.steps.filter((step) => !step.targetStep).length;
}

/**
Expand Down Expand Up @@ -112,7 +114,7 @@ export class AutoExecutor {
*/
protected async runSingle<TResult>(lazyStep: BaseLazyStep<TResult>) {
if (this.stepCount < this.nonPlanStepCount) {
const step = this.context.steps[this.stepCount + this.planStepCount];
const step = this.steps[this.stepCount + this.planStepCount];
validateStep(lazyStep, step);
await this.debug?.log("INFO", "RUN_SINGLE", {
fromRequest: true,
Expand Down Expand Up @@ -150,7 +152,7 @@ export class AutoExecutor {
const initialStepCount = this.stepCount - (parallelSteps.length - 1);
const parallelCallState = this.getParallelCallState(parallelSteps.length, initialStepCount);

const sortedSteps = sortSteps(this.context.steps);
const sortedSteps = sortSteps(this.steps);

// get the expected concurrency. Will be undefined in the `first` case.
const plannedParallelStepCount = sortedSteps[initialStepCount + this.planStepCount]?.concurrent;
Expand Down Expand Up @@ -190,7 +192,7 @@ export class AutoExecutor {
*
* Execute the step and call qstash with the result
*/
const planStep = this.context.steps.at(-1);
const planStep = this.steps.at(-1);
if (!planStep || planStep.targetStep === undefined) {
throw new QstashWorkflowError(
`There must be a last step and it should have targetStep larger than 0.` +
Expand Down Expand Up @@ -278,7 +280,7 @@ export class AutoExecutor {
parallelStepCount: number,
initialStepCount: number
): ParallelCallState {
const remainingSteps = this.context.steps.filter(
const remainingSteps = this.steps.filter(
(step) => (step.targetStep ?? step.stepId) >= initialStepCount
);

Expand Down Expand Up @@ -317,7 +319,7 @@ export class AutoExecutor {

await this.debug?.log("SUBMIT", "SUBMIT_STEP", { length: steps.length, steps });

const result = await this.context.client.batchJSON(
const result = await this.context.qstashClient.batchJSON(
steps.map((singleStep) => {
const headers = getHeaders(
"false",
Expand Down
Loading

0 comments on commit 07b25b9

Please sign in to comment.