diff --git a/bun.lockb b/bun.lockb index 931c030..4ee602c 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/examples/ci/app/test-routes/invoke/called-endpoint/route.ts b/examples/ci/app/test-routes/invoke/called-endpoint/route.ts index af31e61..e2b5820 100644 --- a/examples/ci/app/test-routes/invoke/called-endpoint/route.ts +++ b/examples/ci/app/test-routes/invoke/called-endpoint/route.ts @@ -1,6 +1,6 @@ import { expect } from "app/ci/utils"; export const GET = async (request: Request) => { - expect(request.headers.get("upstash-workflow-invoke-count"), null) + expect(request.headers.get("upstash-workflow-invoke-count"), "2") return new Response(JSON.stringify({}), { status: 200 }); } \ No newline at end of file diff --git a/examples/express/package.json b/examples/express/package.json index b0333f4..8115467 100644 --- a/examples/express/package.json +++ b/examples/express/package.json @@ -17,7 +17,6 @@ "express": "^4.19.2" }, "devDependencies": { - "@types/express": "^5.0.0", "@types/node": "^20.5.2", "tsx": "^4.7.3", "typescript": "^5.1.6" diff --git a/package.json b/package.json index aea35aa..7c15009 100644 --- a/package.json +++ b/package.json @@ -86,7 +86,7 @@ "@solidjs/start": "^1.0.8", "@sveltejs/kit": "^2.6.1", "@types/bun": "^1.1.10", - "@types/express": "^5.0.0", + "@types/express": "^5.0.1", "astro": "^4.16.7", "eslint": "^9.11.1", "eslint-plugin-unicorn": "^55.0.0", @@ -102,9 +102,9 @@ "typescript-eslint": "^8.18.0" }, "dependencies": { - "@ai-sdk/openai": "^1.0.15", + "@ai-sdk/openai": "^1.2.1", "@upstash/qstash": "^2.7.22", - "ai": "^4.0.30", + "ai": "^4.1.54", "zod": "^3.24.1" }, "directories": { diff --git a/src/agents/adapters.test.ts b/src/agents/adapters.test.ts index 9f6aaff..77dae70 100644 --- a/src/agents/adapters.test.ts +++ b/src/agents/adapters.test.ts @@ -52,7 +52,6 @@ describe("wrapTools", () => { expect(Object.entries(wrappedTools).length).toBe(1); const wrappedTool = wrappedTools["aiSDKTool"]; - // @ts-expect-error description exists but can't resolve the type expect(wrappedTool.description).toBe(aiSDKToolDescription); await mockQStashServer({ @@ -82,12 +81,9 @@ describe("wrapTools", () => { headers: { "upstash-workflow-sdk-version": "1", "content-type": "application/json", - "upstash-failure-callback-retries": "3", "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-method": "POST", - "upstash-retries": "3", "upstash-workflow-init": "false", "upstash-workflow-runid": workflowRunId, "upstash-workflow-url": "https://requestcatcher.com/api", @@ -104,7 +100,6 @@ describe("wrapTools", () => { expect(Object.entries(wrappedTools).length).toBe(1); const wrappedTool = wrappedTools["langChainTool"]; - // @ts-expect-error description exists but can't resolve the type expect(wrappedTool.description).toBe(langChainToolDescription); await mockQStashServer({ @@ -134,12 +129,9 @@ describe("wrapTools", () => { headers: { "upstash-workflow-sdk-version": "1", "content-type": "application/json", - "upstash-failure-callback-retries": "3", "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-method": "POST", - "upstash-retries": "3", "upstash-workflow-init": "false", "upstash-workflow-runid": workflowRunId, "upstash-workflow-url": "https://requestcatcher.com/api", @@ -156,11 +148,9 @@ describe("wrapTools", () => { expect(Object.entries(wrappedTools).length).toBe(2); const wrappedLangChainTool = wrappedTools["langChainTool"]; - // @ts-expect-error description exists but can't resolve the type expect(wrappedLangChainTool.description).toBe(langChainToolDescription); const wrappedAiSDKTool = wrappedTools["aiSDKTool"]; - // @ts-expect-error description exists but can't resolve the type expect(wrappedAiSDKTool.description).toBe(aiSDKToolDescription); }); @@ -180,7 +170,6 @@ describe("wrapTools", () => { expect(Object.entries(wrappedTools).length).toBe(1); const wrappedTool = wrappedTools["nonwrappedWorkflowTool"]; - // @ts-expect-error description exists but can't resolve the type expect(wrappedTool.description).toBe(workflowToolDescription); await mockQStashServer({ @@ -209,12 +198,9 @@ describe("wrapTools", () => { headers: { "content-type": "application/json", "upstash-delay": "1000s", - "upstash-failure-callback-retries": "3", "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-method": "POST", - "upstash-retries": "3", "upstash-workflow-init": "false", "upstash-workflow-runid": workflowRunId, "upstash-workflow-sdk-version": "1", @@ -232,7 +218,6 @@ describe("wrapTools", () => { expect(Object.entries(wrappedTools).length).toBe(1); const wrappedTool = wrappedTools["wrappedWorkflowTool"]; - // @ts-expect-error description exists but can't resolve the type expect(wrappedTool.description).toBe(workflowToolDescription); await mockQStashServer({ @@ -261,12 +246,9 @@ describe("wrapTools", () => { destination: WORKFLOW_ENDPOINT, headers: { "content-type": "application/json", - "upstash-failure-callback-retries": "3", "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-method": "POST", - "upstash-retries": "3", "upstash-workflow-init": "false", "upstash-workflow-runid": workflowRunId, "upstash-workflow-sdk-version": "1", diff --git a/src/agents/agent.test.ts b/src/agents/agent.test.ts index a6243cd..4b6f90d 100644 --- a/src/agents/agent.test.ts +++ b/src/agents/agent.test.ts @@ -24,6 +24,7 @@ describe("agents", () => { steps: [], url: WORKFLOW_ENDPOINT, workflowRunId, + retries: 5, }); const agentsApi = new WorkflowAgents({ context }); @@ -93,13 +94,10 @@ describe("agents", () => { "upstash-callback-forward-upstash-workflow-stepid": "1", "upstash-callback-forward-upstash-workflow-stepname": "Call Agent my agent", "upstash-callback-forward-upstash-workflow-steptype": "Call", - "upstash-callback-forward-upstash-workflow-invoke-count": "0", - "upstash-callback-retries": "3", "upstash-callback-workflow-calltype": "fromCallback", "upstash-callback-workflow-init": "false", "upstash-callback-workflow-runid": workflowRunId, "upstash-callback-workflow-url": "https://requestcatcher.com/api", - "upstash-failure-callback-retries": "3", "upstash-feature-set": "WF_NoDelete,InitialBody", "upstash-forward-authorization": `Bearer ${openaiToken}`, "upstash-forward-content-type": "application/json", @@ -110,6 +108,7 @@ describe("agents", () => { "upstash-workflow-init": "false", "upstash-workflow-runid": workflowRunId, "upstash-workflow-url": "https://requestcatcher.com/api", + "upstash-callback-retries": "5", }, }, ], @@ -120,7 +119,6 @@ describe("agents", () => { test("should convert agent to tool", async () => { const agentTool = agent.asTool(); - // @ts-expect-error description exists but isn't accessible expect(agentTool.description).toBe( "An AI Agent with the following background: an agentHas access to the following tools: ai sdk tool" ); @@ -160,13 +158,11 @@ describe("agents", () => { "upstash-callback-forward-upstash-workflow-stepid": "2", "upstash-callback-forward-upstash-workflow-stepname": "Call Agent my agent", "upstash-callback-forward-upstash-workflow-steptype": "Call", - "upstash-callback-forward-upstash-workflow-invoke-count": "0", - "upstash-callback-retries": "3", + "upstash-callback-retries": "5", "upstash-callback-workflow-calltype": "fromCallback", "upstash-callback-workflow-init": "false", "upstash-callback-workflow-runid": workflowRunId, "upstash-callback-workflow-url": "https://requestcatcher.com/api", - "upstash-failure-callback-retries": "3", "upstash-feature-set": "WF_NoDelete,InitialBody", "upstash-forward-authorization": `Bearer ${openaiToken}`, "upstash-forward-content-type": "application/json", @@ -225,13 +221,11 @@ describe("agents", () => { "upstash-callback-forward-upstash-workflow-stepid": "3", "upstash-callback-forward-upstash-workflow-stepname": "Call Agent manager llm", "upstash-callback-forward-upstash-workflow-steptype": "Call", - "upstash-callback-forward-upstash-workflow-invoke-count": "0", - "upstash-callback-retries": "3", + "upstash-callback-retries": "5", "upstash-callback-workflow-calltype": "fromCallback", "upstash-callback-workflow-init": "false", "upstash-callback-workflow-runid": workflowRunId, "upstash-callback-workflow-url": "https://requestcatcher.com/api", - "upstash-failure-callback-retries": "3", "upstash-feature-set": "WF_NoDelete,InitialBody", "upstash-forward-authorization": `Bearer ${openaiToken}`, "upstash-forward-content-type": "application/json", diff --git a/src/agents/agent.ts b/src/agents/agent.ts index 8c75377..ca651a4 100644 --- a/src/agents/agent.ts +++ b/src/agents/agent.ts @@ -93,7 +93,6 @@ export class Agent { */ public asTool(): AISDKTool { const toolDescriptions = Object.values(this.tools) - // @ts-expect-error description exists but can't be resolved .map((tool) => tool.description) .join("\n"); return tool({ diff --git a/src/agents/task.test.ts b/src/agents/task.test.ts index f830490..baa6062 100644 --- a/src/agents/task.test.ts +++ b/src/agents/task.test.ts @@ -118,13 +118,10 @@ describe("tasks", () => { "upstash-callback-forward-upstash-workflow-stepid": "1", "upstash-callback-forward-upstash-workflow-stepname": "Call Agent my agent", "upstash-callback-forward-upstash-workflow-steptype": "Call", - "upstash-callback-forward-upstash-workflow-invoke-count": "0", - "upstash-callback-retries": "3", "upstash-callback-workflow-calltype": "fromCallback", "upstash-callback-workflow-init": "false", "upstash-callback-workflow-runid": workflowRunId, "upstash-callback-workflow-url": "https://requestcatcher.com/api", - "upstash-failure-callback-retries": "3", "upstash-feature-set": "WF_NoDelete,InitialBody", "upstash-forward-authorization": `Bearer ${openaiToken}`, "upstash-forward-content-type": "application/json", @@ -199,13 +196,10 @@ describe("tasks", () => { "upstash-callback-forward-upstash-workflow-stepid": "1", "upstash-callback-forward-upstash-workflow-stepname": "Call Agent Manager LLM", "upstash-callback-forward-upstash-workflow-steptype": "Call", - "upstash-callback-forward-upstash-workflow-invoke-count": "0", - "upstash-callback-retries": "3", "upstash-callback-workflow-calltype": "fromCallback", "upstash-callback-workflow-init": "false", "upstash-callback-workflow-runid": workflowRunId, "upstash-callback-workflow-url": "https://requestcatcher.com/api", - "upstash-failure-callback-retries": "3", "upstash-feature-set": "WF_NoDelete,InitialBody", "upstash-forward-authorization": `Bearer ${customApiKey}`, "upstash-forward-content-type": "application/json", @@ -271,13 +265,10 @@ describe("tasks", () => { "upstash-callback-forward-upstash-workflow-contenttype": "application/json", "upstash-callback-forward-upstash-workflow-stepid": "1", "upstash-callback-forward-upstash-workflow-steptype": "Call", - "upstash-callback-forward-upstash-workflow-invoke-count": "0", - "upstash-callback-retries": "3", "upstash-callback-workflow-calltype": "fromCallback", "upstash-callback-workflow-init": "false", "upstash-callback-workflow-runid": workflowRunId, "upstash-callback-workflow-url": "https://requestcatcher.com/api", - "upstash-failure-callback-retries": "3", "upstash-feature-set": "WF_NoDelete,InitialBody", "upstash-forward-content-type": "application/json", "upstash-forward-upstash-agent-name": "my agent", diff --git a/src/context/auto-executor.test.ts b/src/context/auto-executor.test.ts index d10f9ec..a1a7565 100644 --- a/src/context/auto-executor.test.ts +++ b/src/context/auto-executor.test.ts @@ -91,6 +91,12 @@ describe("auto-executor", () => { headers: new Headers({}) as Headers, steps, url: WORKFLOW_ENDPOINT, + retries: 6, + flowControl: { + key: "key", + parallelism: 10, + }, + invokeCount: 7, }); }; @@ -124,13 +130,14 @@ describe("auto-executor", () => { "content-type": "application/json", "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-method": "POST", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", "upstash-workflow-runid": workflowRunId, "upstash-workflow-init": "false", "upstash-workflow-url": WORKFLOW_ENDPOINT, + "upstash-retries": "6", + "upstash-flow-control-key": "key", + "upstash-flow-control-value": "parallelism=10", + "upstash-forward-upstash-workflow-invoke-count": "7", }, body: JSON.stringify({ ...singleStep, @@ -220,13 +227,14 @@ describe("auto-executor", () => { "upstash-feature-set": "LazyFetch,InitialBody", "upstash-delay": "123s", "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-method": "POST", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", "upstash-workflow-runid": workflowRunId, "upstash-workflow-init": "false", "upstash-workflow-url": WORKFLOW_ENDPOINT, + "upstash-retries": "6", + "upstash-flow-control-key": "key", + "upstash-flow-control-value": "parallelism=10", + "upstash-forward-upstash-workflow-invoke-count": "7", }, }, { @@ -238,13 +246,14 @@ describe("auto-executor", () => { "upstash-feature-set": "LazyFetch,InitialBody", "upstash-delay": "10m", "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-method": "POST", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", "upstash-workflow-runid": workflowRunId, "upstash-workflow-init": "false", "upstash-workflow-url": WORKFLOW_ENDPOINT, + "upstash-retries": "6", + "upstash-flow-control-key": "key", + "upstash-flow-control-value": "parallelism=10", + "upstash-forward-upstash-workflow-invoke-count": "7", }, }, { @@ -255,14 +264,15 @@ describe("auto-executor", () => { "content-type": "application/json", "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-method": "POST", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", "upstash-not-before": "123123", "upstash-workflow-runid": workflowRunId, "upstash-workflow-init": "false", "upstash-workflow-url": WORKFLOW_ENDPOINT, + "upstash-retries": "6", + "upstash-flow-control-key": "key", + "upstash-flow-control-value": "parallelism=10", + "upstash-forward-upstash-workflow-invoke-count": "7", }, }, { @@ -273,14 +283,14 @@ describe("auto-executor", () => { "content-type": "application/json", "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-method": "POST", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", - "upstash-workflow-calltype": "step", "upstash-workflow-runid": workflowRunId, "upstash-workflow-init": "false", "upstash-workflow-url": WORKFLOW_ENDPOINT, + "upstash-retries": "6", + "upstash-flow-control-key": "key", + "upstash-flow-control-value": "parallelism=10", + "upstash-forward-upstash-workflow-invoke-count": "7", }, }, ], @@ -334,13 +344,14 @@ describe("auto-executor", () => { "content-type": "application/json", "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-method": "POST", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", "upstash-workflow-runid": workflowRunId, "upstash-workflow-init": "false", "upstash-workflow-url": WORKFLOW_ENDPOINT, + "upstash-retries": "6", + "upstash-flow-control-key": "key", + "upstash-flow-control-value": "parallelism=10", + "upstash-forward-upstash-workflow-invoke-count": "7", }, body: JSON.stringify(parallelSteps[2]), }, @@ -390,13 +401,14 @@ describe("auto-executor", () => { "content-type": "application/json", "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-method": "POST", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", "upstash-workflow-runid": workflowRunId, "upstash-workflow-init": "false", "upstash-workflow-url": WORKFLOW_ENDPOINT, + "upstash-retries": "6", + "upstash-flow-control-key": "key", + "upstash-flow-control-value": "parallelism=10", + "upstash-forward-upstash-workflow-invoke-count": "7", }, body: JSON.stringify(parallelSteps[3]), }, diff --git a/src/context/auto-executor.ts b/src/context/auto-executor.ts index 070ef0b..4da00a4 100644 --- a/src/context/auto-executor.ts +++ b/src/context/auto-executor.ts @@ -1,12 +1,10 @@ import { WorkflowAbort, WorkflowError } from "../error"; import type { WorkflowContext } from "./context"; -import type { StepFunction, ParallelCallState, Step, WaitRequest, Telemetry } from "../types"; -import { LazyCallStep, LazyInvokeStep, type BaseLazyStep } from "./steps"; -import { getHeaders } from "../workflow-requests"; +import type { StepFunction, ParallelCallState, Step, Telemetry } from "../types"; +import { type BaseLazyStep } from "./steps"; import type { WorkflowLogger } from "../logger"; -import { NO_CONCURRENCY } from "../constants"; import { QstashError } from "@upstash/qstash"; -import { invokeWorkflow } from "../serve/serve-many"; +import { submitParallelSteps, submitSingleStep } from "../qstash/submit-steps"; export class AutoExecutor { private context: WorkflowContext; @@ -62,7 +60,7 @@ export class AutoExecutor { if (this.executingStep) { throw new WorkflowError( "A step can not be run inside another step." + - ` Tried to run '${stepInfo.stepName}' inside '${this.executingStep}'` + ` Tried to run '${stepInfo.stepName}' inside '${this.executingStep}'` ); } @@ -126,7 +124,7 @@ export class AutoExecutor { * @param lazyStep lazy step to execute * @returns step result */ - protected async runSingle(lazyStep: BaseLazyStep) { + protected async runSingle(lazyStep: BaseLazyStep): Promise { if (this.stepCount < this.nonPlanStepCount) { const step = this.steps[this.stepCount + this.planStepCount]; validateStep(lazyStep, step); @@ -138,16 +136,16 @@ export class AutoExecutor { return lazyStep.parseOut(step.out); } - const resultStep = await lazyStep.getResultStep(NO_CONCURRENCY, this.stepCount); - - await this.debug?.log("INFO", "RUN_SINGLE", { - fromRequest: false, - step: resultStep, - stepCount: this.stepCount, + const resultStep = await submitSingleStep({ + context: this.context, + lazyStep, + stepId: this.stepCount, + invokeCount: this.invokeCount, + concurrency: 1, + telemetry: this.telemetry, + debug: this.debug, }); - await this.submitStepsToQStash([resultStep], [lazyStep]); - - return resultStep.out as TResult; + throw new WorkflowAbort(lazyStep.stepName, resultStep); } /** @@ -175,7 +173,7 @@ export class AutoExecutor { // user has added/removed a parallel step throw new WorkflowError( `Incompatible number of parallel steps when call state was '${parallelCallState}'.` + - ` Expected ${parallelSteps.length}, got ${plannedParallelStepCount} from the request.` + ` Expected ${parallelSteps.length}, got ${plannedParallelStepCount} from the request.` ); } @@ -189,14 +187,14 @@ export class AutoExecutor { switch (parallelCallState) { case "first": { - /** - * Encountering a parallel step for the first time, create plan steps for each parallel step - * and send them to QStash. QStash will call us back parallelSteps.length many times - */ - const planSteps = parallelSteps.map((parallelStep, index) => - parallelStep.getPlanStep(parallelSteps.length, initialStepCount + index) - ); - await this.submitStepsToQStash(planSteps, parallelSteps); + await submitParallelSteps({ + context: this.context, + steps: parallelSteps, + initialStepCount, + invokeCount: this.invokeCount, + telemetry: this.telemetry, + debug: this.debug + }); break; } case "partial": { @@ -210,7 +208,7 @@ export class AutoExecutor { if (!planStep || planStep.targetStep === undefined) { throw new WorkflowError( `There must be a last step and it should have targetStep larger than 0.` + - `Received: ${JSON.stringify(planStep)}` + `Received: ${JSON.stringify(planStep)}` ); } const stepIndex = planStep.targetStep - initialStepCount; @@ -226,11 +224,16 @@ export class AutoExecutor { validateStep(parallelSteps[stepIndex], planStep); try { const parallelStep = parallelSteps[stepIndex]; - const resultStep = await parallelStep.getResultStep( - parallelSteps.length, - planStep.targetStep - ); - await this.submitStepsToQStash([resultStep], [parallelStep]); + const resultStep = await submitSingleStep({ + context: this.context, + lazyStep: parallelStep, + stepId: planStep.targetStep, + invokeCount: this.invokeCount, + concurrency: parallelSteps.length, + telemetry: this.telemetry, + debug: this.debug, + }); + throw new WorkflowAbort(parallelStep.stepName, resultStep); } catch (error) { if ( error instanceof WorkflowAbort || @@ -324,148 +327,6 @@ export class AutoExecutor { } } - /** - * sends the steps to QStash as batch - * - * @param steps steps to send - */ - private async submitStepsToQStash(steps: Step[], lazySteps: BaseLazyStep[]) { - // if there are no steps, something went wrong. Raise exception - if (steps.length === 0) { - throw new WorkflowError( - `Unable to submit steps to QStash. Provided list is empty. Current step: ${this.stepCount}` - ); - } - - await this.debug?.log("SUBMIT", "SUBMIT_STEP", { - length: steps.length, - steps, - }); - - // must check length to be 1, otherwise was the if would return - // true for plan steps. - if (steps[0].waitEventId && steps.length === 1) { - const waitStep = steps[0]; - - const { headers, timeoutHeaders } = getHeaders({ - initHeaderValue: "false", - workflowRunId: this.context.workflowRunId, - workflowUrl: this.context.url, - userHeaders: this.context.headers, - step: waitStep, - failureUrl: this.context.failureUrl, - retries: this.context.retries, - telemetry: this.telemetry, - invokeCount: this.invokeCount, - flowControl: this.context.flowControl, - }); - - // call wait - const waitBody: WaitRequest = { - url: this.context.url, - timeout: waitStep.timeout, - timeoutBody: undefined, - timeoutUrl: this.context.url, - timeoutHeaders, - step: { - stepId: waitStep.stepId, - stepType: "Wait", - stepName: waitStep.stepName, - concurrent: waitStep.concurrent, - targetStep: waitStep.targetStep, - }, - }; - - await this.context.qstashClient.http.request({ - path: ["v2", "wait", waitStep.waitEventId], - body: JSON.stringify(waitBody), - headers, - method: "POST", - parseResponseAsJson: false, - }); - - throw new WorkflowAbort(waitStep.stepName, waitStep); - } - // must check length to be 1, otherwise was the if would return - // true for plan steps. - if (steps.length === 1 && lazySteps[0] instanceof LazyInvokeStep) { - const invokeStep = steps[0]; - const lazyInvokeStep = lazySteps[0]; - await invokeWorkflow({ - settings: lazyInvokeStep.params, - invokeStep, - context: this.context, - invokeCount: this.invokeCount, - telemetry: this.telemetry, - }); - - throw new WorkflowAbort(invokeStep.stepName, invokeStep); - } - - const result = await this.context.qstashClient.batch( - steps.map((singleStep, index) => { - const lazyStep = lazySteps[index]; - const { headers } = getHeaders({ - initHeaderValue: "false", - workflowRunId: this.context.workflowRunId, - workflowUrl: this.context.url, - userHeaders: this.context.headers, - step: singleStep, - failureUrl: this.context.failureUrl, - retries: this.context.retries, - callRetries: lazyStep instanceof LazyCallStep ? lazyStep.retries : undefined, - callTimeout: lazyStep instanceof LazyCallStep ? lazyStep.timeout : undefined, - telemetry: this.telemetry, - invokeCount: this.invokeCount, - flowControl: this.context.flowControl, - callFlowControl: lazyStep instanceof LazyCallStep ? lazyStep.flowControl : undefined, - }); - - // if the step is a single step execution or a plan step, we can add sleep headers - const willWait = singleStep.concurrent === NO_CONCURRENCY || singleStep.stepId === 0; - - singleStep.out = JSON.stringify(singleStep.out); - - return singleStep.callUrl && lazyStep instanceof LazyCallStep - ? // if the step is a third party call, we call the third party - // url (singleStep.callUrl) and pass information about the workflow - // in the headers (handled in getHeaders). QStash makes the request - // to callUrl and returns the result to Workflow endpoint. - // handleThirdPartyCallResult method sends the result of the third - // party call to QStash. - { - headers, - method: singleStep.callMethod, - body: JSON.stringify(singleStep.callBody), - url: singleStep.callUrl, - } - : // if the step is not a third party call, we use workflow - // endpoint (context.url) as URL when calling QStash. QStash - // calls us back with the updated steps list. - { - headers, - method: "POST", - body: JSON.stringify(singleStep), - url: this.context.url, - notBefore: willWait ? singleStep.sleepUntil : undefined, - delay: willWait ? singleStep.sleepFor : undefined, - }; - }) - ); - - const _result = result as { messageId: string }[]; - await this.debug?.log("INFO", "SUBMIT_STEP", { - messageIds: _result.map((message) => { - return { - message: message.messageId, - }; - }), - }); - - // if the steps are sent successfully, abort to stop the current request - throw new WorkflowAbort(steps[0].stepName, steps[0]); - } - /** * Get the promise by executing the lazt steps list. If there is a single * step, we call `runSingle`. Otherwise `runParallel` is called. @@ -523,14 +384,14 @@ const validateStep = (lazyStep: BaseLazyStep, stepFromRequest: Step): void => { if (lazyStep.stepName !== stepFromRequest.stepName) { throw new WorkflowError( `Incompatible step name. Expected '${lazyStep.stepName}',` + - ` got '${stepFromRequest.stepName}' from the request` + ` got '${stepFromRequest.stepName}' from the request` ); } // check type name if (lazyStep.stepType !== stepFromRequest.stepType) { throw new WorkflowError( `Incompatible step type. Expected '${lazyStep.stepType}',` + - ` got '${stepFromRequest.stepType}' from the request` + ` got '${stepFromRequest.stepType}' from the request` ); } }; @@ -558,10 +419,10 @@ const validateParallelSteps = (lazySteps: BaseLazyStep[], stepsFromRequest: Step const requestStepTypes = stepsFromRequest.map((step) => step.stepType); throw new WorkflowError( `Incompatible steps detected in parallel execution: ${error.message}` + - `\n > Step Names from the request: ${JSON.stringify(requestStepNames)}` + - `\n Step Types from the request: ${JSON.stringify(requestStepTypes)}` + - `\n > Step Names expected: ${JSON.stringify(lazyStepNames)}` + - `\n Step Types expected: ${JSON.stringify(lazyStepTypes)}` + `\n > Step Names from the request: ${JSON.stringify(requestStepNames)}` + + `\n Step Types from the request: ${JSON.stringify(requestStepTypes)}` + + `\n > Step Names expected: ${JSON.stringify(lazyStepNames)}` + + `\n Step Types expected: ${JSON.stringify(lazyStepTypes)}` ); } throw error; diff --git a/src/context/context.test.ts b/src/context/context.test.ts index 377134a..d83bda8 100644 --- a/src/context/context.test.ts +++ b/src/context/context.test.ts @@ -147,7 +147,6 @@ describe("context tests", () => { "upstash-forward-upstash-workflow-invoke-count": "5", "upstash-method": "POST", "upstash-retries": "2", - "upstash-failure-callback-retries": "2", "upstash-workflow-init": "false", "upstash-workflow-runid": "wfr-id", "upstash-workflow-url": WORKFLOW_ENDPOINT, @@ -197,8 +196,6 @@ describe("context tests", () => { "Upstash-Feature-Set": ["LazyFetch,InitialBody"], [`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: ["1"], [WORKFLOW_PROTOCOL_VERSION_HEADER]: [WORKFLOW_PROTOCOL_VERSION], - "Upstash-Retries": ["3"], - "Upstash-Failure-Callback-Retries": ["3"], "Upstash-Workflow-CallType": ["step"], [WORKFLOW_INIT_HEADER]: ["false"], [WORKFLOW_ID_HEADER]: ["wfr-id"], @@ -253,9 +250,6 @@ describe("context tests", () => { "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-forward-upstash-workflow-invoke-count": "1", "upstash-method": "POST", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", - "upstash-workflow-calltype": "step", "upstash-workflow-init": "false", "upstash-workflow-runid": "wfr-id", "upstash-workflow-url": WORKFLOW_ENDPOINT, @@ -271,8 +265,6 @@ describe("context tests", () => { "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-forward-upstash-workflow-invoke-count": "1", "upstash-method": "POST", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", "upstash-workflow-init": "false", "upstash-workflow-runid": "wfr-id", "upstash-workflow-url": WORKFLOW_ENDPOINT, @@ -337,15 +329,14 @@ describe("context tests", () => { "upstash-callback-forward-upstash-workflow-contenttype": "application/x-www-form-urlencoded", "upstash-callback-forward-upstash-workflow-invoke-count": "7", + "upstash-forward-upstash-workflow-invoke-count": "7", "upstash-callback-forward-upstash-workflow-stepid": "1", "upstash-callback-forward-upstash-workflow-stepname": "my-step", "upstash-callback-forward-upstash-workflow-steptype": "Call", - "upstash-callback-retries": "3", "upstash-callback-workflow-calltype": "fromCallback", "upstash-callback-workflow-init": "false", "upstash-callback-workflow-runid": "wfr-id", "upstash-callback-workflow-url": WORKFLOW_ENDPOINT, - "upstash-failure-callback-retries": "3", "upstash-feature-set": "WF_NoDelete,InitialBody", "upstash-forward-my-header": "my-value", "upstash-method": "PATCH", @@ -402,16 +393,13 @@ describe("context tests", () => { "upstash-callback-forward-upstash-workflow-callback": "true", "upstash-callback-forward-upstash-workflow-concurrent": "1", "upstash-callback-forward-upstash-workflow-contenttype": "application/json", - "upstash-callback-forward-upstash-workflow-invoke-count": "0", "upstash-callback-forward-upstash-workflow-stepid": "1", "upstash-callback-forward-upstash-workflow-stepname": "my-step", "upstash-callback-forward-upstash-workflow-steptype": "Call", - "upstash-callback-retries": "3", "upstash-callback-workflow-calltype": "fromCallback", "upstash-callback-workflow-init": "false", "upstash-callback-workflow-runid": "wfr-id", "upstash-callback-workflow-url": WORKFLOW_ENDPOINT, - "upstash-failure-callback-retries": "3", "upstash-feature-set": "WF_NoDelete,InitialBody", "upstash-forward-my-header": "my-value", "upstash-method": "PATCH", @@ -569,7 +557,6 @@ describe("context tests", () => { "upstash-callback-forward-upstash-workflow-callback": "true", "upstash-callback-forward-upstash-workflow-concurrent": "1", "upstash-callback-forward-upstash-workflow-contenttype": "application/json", - "upstash-callback-forward-upstash-workflow-invoke-count": "0", "upstash-callback-forward-upstash-workflow-stepid": "1", "upstash-callback-forward-upstash-workflow-stepname": stepName, "upstash-callback-forward-upstash-workflow-steptype": "Call", @@ -578,7 +565,6 @@ describe("context tests", () => { "upstash-callback-workflow-init": "false", "upstash-callback-workflow-runid": "wfr-id", "upstash-callback-workflow-url": WORKFLOW_ENDPOINT, - "upstash-failure-callback-retries": "2", "upstash-feature-set": "WF_NoDelete,InitialBody", "upstash-forward-authorization": `Bearer ${openAIToken}`, "upstash-forward-content-type": "application/json", @@ -604,6 +590,7 @@ describe("context tests", () => { headers: new Headers() as Headers, workflowRunId: "wfr-id", retries: 2, + invokeCount: 5, }); const openAIToken = `hello-there`; @@ -653,7 +640,8 @@ describe("context tests", () => { "upstash-callback-forward-upstash-workflow-callback": "true", "upstash-callback-forward-upstash-workflow-concurrent": "1", "upstash-callback-forward-upstash-workflow-contenttype": "application/json", - "upstash-callback-forward-upstash-workflow-invoke-count": "0", + "upstash-callback-forward-upstash-workflow-invoke-count": "5", + "upstash-forward-upstash-workflow-invoke-count": "5", "upstash-callback-forward-upstash-workflow-stepid": "1", "upstash-callback-forward-upstash-workflow-stepname": stepName, "upstash-callback-forward-upstash-workflow-steptype": "Call", @@ -662,7 +650,6 @@ describe("context tests", () => { "upstash-callback-workflow-init": "false", "upstash-callback-workflow-runid": "wfr-id", "upstash-callback-workflow-url": WORKFLOW_ENDPOINT, - "upstash-failure-callback-retries": "2", "upstash-feature-set": "WF_NoDelete,InitialBody", "upstash-forward-authorization": `Bearer ${openAIToken}`, "upstash-forward-content-type": "application/json", @@ -734,7 +721,6 @@ describe("context tests", () => { "upstash-callback-forward-upstash-workflow-callback": "true", "upstash-callback-forward-upstash-workflow-concurrent": "1", "upstash-callback-forward-upstash-workflow-contenttype": "application/json", - "upstash-callback-forward-upstash-workflow-invoke-count": "0", "upstash-callback-forward-upstash-workflow-stepid": "1", "upstash-callback-forward-upstash-workflow-stepname": stepName, "upstash-callback-forward-upstash-workflow-steptype": "Call", @@ -743,7 +729,6 @@ describe("context tests", () => { "upstash-callback-workflow-init": "false", "upstash-callback-workflow-runid": "wfr-id", "upstash-callback-workflow-url": WORKFLOW_ENDPOINT, - "upstash-failure-callback-retries": "2", "upstash-feature-set": "WF_NoDelete,InitialBody", "upstash-forward-authorization": `Bearer ${resendToken}`, "upstash-forward-content-type": "application/json", @@ -769,6 +754,7 @@ describe("context tests", () => { headers: new Headers() as Headers, workflowRunId: "wfr-id", retries: 2, + invokeCount: 3, }); const anthropicToken = `hello-there`; @@ -825,7 +811,8 @@ describe("context tests", () => { "upstash-callback-forward-upstash-workflow-callback": "true", "upstash-callback-forward-upstash-workflow-concurrent": "1", "upstash-callback-forward-upstash-workflow-contenttype": "application/json", - "upstash-callback-forward-upstash-workflow-invoke-count": "0", + "upstash-callback-forward-upstash-workflow-invoke-count": "3", + "upstash-forward-upstash-workflow-invoke-count": "3", "upstash-callback-forward-upstash-workflow-stepid": "1", "upstash-callback-forward-upstash-workflow-stepname": stepName, "upstash-callback-forward-upstash-workflow-steptype": "Call", @@ -834,7 +821,6 @@ describe("context tests", () => { "upstash-callback-workflow-init": "false", "upstash-callback-workflow-runid": "wfr-id", "upstash-callback-workflow-url": WORKFLOW_ENDPOINT, - "upstash-failure-callback-retries": "2", "upstash-feature-set": "WF_NoDelete,InitialBody", "upstash-forward-x-api-key": anthropicToken, "upstash-forward-anthropic-version": "2023-06-01", diff --git a/src/context/steps.ts b/src/context/steps.ts index 301560c..33b1cfd 100644 --- a/src/context/steps.ts +++ b/src/context/steps.ts @@ -1,19 +1,33 @@ import type { Client, FlowControl, HTTPMethods } from "@upstash/qstash"; import type { CallResponse, + HeaderParams, InvokeStepResponse, + InvokeWorkflowRequest, LazyInvokeStepParams, NotifyStepResponse, RequiredExceptFields, Step, StepFunction, StepType, + WaitRequest, WaitStepResponse, } from "../types"; import { makeNotifyRequest } from "../client/utils"; import type { Duration } from "../types"; import { WorkflowError } from "../error"; import { getWorkflowRunId } from "../utils"; +import { WorkflowContext } from "./context"; +import { getHeaders, prepareFlowControl } from "../qstash/headers"; +import { WORKFLOW_FEATURE_HEADER, WORKFLOW_INIT_HEADER, WORKFLOW_URL_HEADER } from "../constants"; +import { getTelemetryHeaders, HeadersResponse } from "../workflow-requests"; + +type StepParams = { context: WorkflowContext } & Pick & + Required>; +type GetHeaderParams = StepParams; +type GetBodyParams = StepParams & Omit; +type SubmitStepParams = StepParams & + Pick & { body: string; isParallel: boolean }; /** * Base class outlining steps. Basically, each step kind (run/sleep/sleepUntil) @@ -109,6 +123,43 @@ export abstract class BaseLazyStep { return stepOut; } } + + getBody({ step }: GetBodyParams): string { + step.out = JSON.stringify(step.out); + return JSON.stringify(step); + } + + getHeaders({ context, telemetry, invokeCount, step }: GetHeaderParams): HeadersResponse { + return getHeaders({ + initHeaderValue: "false", + workflowConfig: { + workflowRunId: context.workflowRunId, + workflowUrl: context.url, + failureUrl: context.failureUrl, + retries: context.retries, + useJSONContent: false, + telemetry, + flowControl: context.flowControl, + }, + userHeaders: context.headers, + invokeCount, + stepInfo: { + step, + lazyStep: this, + }, + }); + } + + async submitStep({ context, body, headers }: SubmitStepParams) { + return await context.qstashClient.batch([ + { + body, + headers, + method: "POST", + url: context.url, + }, + ]) as { messageId: string }[]; + } } /** @@ -183,6 +234,18 @@ export class LazySleepStep extends BaseLazyStep { concurrent, }); } + + async submitStep({ context, body, headers, isParallel }: SubmitStepParams) { + return await context.qstashClient.batch([ + { + body, + headers, + method: "POST", + url: context.url, + delay: isParallel ? undefined : this.sleep, + }, + ]) as { messageId: string }[] + } } /** @@ -222,6 +285,18 @@ export class LazySleepUntilStep extends BaseLazyStep { protected safeParseOut() { return undefined; } + + async submitStep({ context, body, headers, isParallel }: SubmitStepParams) { + return await context.qstashClient.batch([ + { + body, + headers, + method: "POST", + url: context.url, + notBefore: isParallel ? undefined : this.sleepUntil, + }, + ]) as { messageId: string }[]; + } } export class LazyCallStep extends BaseLazyStep< @@ -230,7 +305,7 @@ export class LazyCallStep extends BaseLazySt private readonly url: string; private readonly method: HTTPMethods; private readonly body: TBody; - private readonly headers: Record; + public readonly headers: Record; public readonly retries: number; public readonly timeout?: number | Duration; public readonly flowControl?: FlowControl; @@ -333,6 +408,70 @@ export class LazyCallStep extends BaseLazySt } return false; }; + + public getBody({ step }: GetBodyParams): string { + if (!step.callUrl) { + throw new WorkflowError("Incompatible step received in LazyCallStep.getBody"); + } + + return JSON.stringify(step.callBody); + } + + getHeaders({ context, telemetry, invokeCount, step }: GetHeaderParams): HeadersResponse { + const { headers, contentType } = super.getHeaders({ context, telemetry, invokeCount, step }); + + headers["Upstash-Retries"] = this.retries.toString(); + headers[WORKFLOW_FEATURE_HEADER] = "WF_NoDelete,InitialBody"; + + if (this.flowControl) { + const { flowControlKey, flowControlValue } = prepareFlowControl(this.flowControl); + + headers["Upstash-Flow-Control-Key"] = flowControlKey; + headers["Upstash-Flow-Control-Value"] = flowControlValue; + } + + if (this.timeout) { + headers["Upstash-Timeout"] = this.timeout.toString(); + } + + const forwardedHeaders = Object.fromEntries( + Object.entries(this.headers).map(([header, value]) => [`Upstash-Forward-${header}`, value]) + ); + + return { + headers: { + ...headers, + ...forwardedHeaders, + + "Upstash-Callback": context.url, + "Upstash-Callback-Workflow-RunId": context.workflowRunId, + "Upstash-Callback-Workflow-CallType": "fromCallback", + "Upstash-Callback-Workflow-Init": "false", + "Upstash-Callback-Workflow-Url": context.url, + "Upstash-Callback-Feature-Set": "LazyFetch,InitialBody", + + "Upstash-Callback-Forward-Upstash-Workflow-Callback": "true", + "Upstash-Callback-Forward-Upstash-Workflow-StepId": step.stepId.toString(), + "Upstash-Callback-Forward-Upstash-Workflow-StepName": this.stepName, + "Upstash-Callback-Forward-Upstash-Workflow-StepType": this.stepType, + "Upstash-Callback-Forward-Upstash-Workflow-Concurrent": step.concurrent.toString(), + "Upstash-Callback-Forward-Upstash-Workflow-ContentType": contentType, + "Upstash-Workflow-CallType": "toCallback", + }, + contentType, + }; + } + + async submitStep({ context, headers }: SubmitStepParams) { + return await context.qstashClient.batch([ + { + headers, + body: JSON.stringify(this.body), + method: this.method, + url: this.url, + }, + ]) as { messageId: string }[]; + } } export class LazyWaitForEventStep extends BaseLazyStep { @@ -381,6 +520,66 @@ export class LazyWaitForEventStep extends BaseLazyStep { eventData: BaseLazyStep.tryParsing(result.eventData), }; } + + public getHeaders({ context, telemetry, invokeCount, step }: GetHeaderParams): HeadersResponse { + const headers = super.getHeaders({ context, telemetry, invokeCount, step }); + headers.headers["Upstash-Workflow-CallType"] = "step"; + return headers; + } + + public getBody({ context, step, headers, telemetry }: GetBodyParams): string { + if (!step.waitEventId) { + throw new WorkflowError("Incompatible step received in LazyWaitForEventStep.getBody"); + } + + const timeoutHeaders = { + // to include user headers: + ...Object.fromEntries(Object.entries(headers).map(([header, value]) => [header, [value]])), + // to include telemetry headers: + ...(telemetry + ? Object.fromEntries( + Object.entries(getTelemetryHeaders(telemetry)).map(([header, value]) => [ + header, + [value], + ]) + ) + : {}), + + // note: using WORKFLOW_ID_HEADER doesn't work, because Runid -> RunId: + "Upstash-Workflow-Runid": [context.workflowRunId], + [WORKFLOW_INIT_HEADER]: ["false"], + [WORKFLOW_URL_HEADER]: [context.url], + "Upstash-Workflow-CallType": ["step"], + }; + + const waitBody: WaitRequest = { + url: context.url, + timeout: step.timeout, + timeoutBody: undefined, + timeoutUrl: context.url, + timeoutHeaders, + step: { + stepId: step.stepId, + stepType: "Wait", + stepName: step.stepName, + concurrent: step.concurrent, + targetStep: step.targetStep, + }, + }; + + return JSON.stringify(waitBody); + } + + async submitStep({ context, body, headers }: SubmitStepParams) { + const result = await context.qstashClient.http.request({ + path: ["v2", "wait", this.eventId], + body: body, + headers, + method: "POST", + parseResponseAsJson: false, + }) as { messageId: string }; + return [result] + } } export class LazyNotifyStep extends LazyFunctionStep { @@ -413,6 +612,10 @@ export class LazyInvokeStep extends BaseLazy stepType: StepType = "Invoke"; params: RequiredExceptFields, "retries" | "flowControl">; protected allowUndefinedOut = false; + /** + * workflow id of the invoked workflow + */ + private workflowId: string; constructor( stepName: string, @@ -434,6 +637,12 @@ export class LazyInvokeStep extends BaseLazy retries, flowControl, }; + + const { workflowId } = workflow; + if (!workflowId) { + throw new WorkflowError("You can only invoke workflow which has a workflowId"); + } + this.workflowId = workflowId; } public getPlanStep(concurrent: number, targetStep: number): Step { @@ -469,4 +678,82 @@ export class LazyInvokeStep extends BaseLazy body: BaseLazyStep.tryParsing(result.body), }; } + + public getBody({ context, step, telemetry, invokeCount }: GetBodyParams): string { + const { headers: invokerHeaders } = getHeaders({ + initHeaderValue: "false", + workflowConfig: { + workflowRunId: context.workflowRunId, + workflowUrl: context.url, + failureUrl: context.failureUrl, + retries: context.retries, + telemetry, + flowControl: context.flowControl, + useJSONContent: false, + }, + userHeaders: context.headers, + invokeCount, + }); + invokerHeaders["Upstash-Workflow-Runid"] = context.workflowRunId; + + const request: InvokeWorkflowRequest = { + body: JSON.stringify(this.params.body), + headers: Object.fromEntries( + Object.entries(invokerHeaders).map((pairs) => [pairs[0], [pairs[1]]]) + ), + workflowRunId: context.workflowRunId, + workflowUrl: context.url, + step, + }; + + return JSON.stringify(request); + } + + getHeaders({ context, telemetry, invokeCount }: GetHeaderParams): HeadersResponse { + const { + workflow, + headers = {}, + workflowRunId = getWorkflowRunId(), + retries, + flowControl, + } = this.params; + const newUrl = context.url.replace(/[^/]+$/, this.workflowId); + + const { + retries: workflowRetries, + failureFunction, + failureUrl, + useJSONContent, + flowControl: workflowFlowControl, + } = workflow.options; + + const { headers: triggerHeaders, contentType } = getHeaders({ + initHeaderValue: "true", + workflowConfig: { + workflowRunId: workflowRunId, + workflowUrl: newUrl, + retries: retries ?? workflowRetries, + telemetry, + failureUrl: failureFunction ? newUrl : failureUrl, + flowControl: flowControl ?? workflowFlowControl, + useJSONContent: useJSONContent ?? false, + }, + invokeCount: invokeCount + 1, + userHeaders: new Headers(headers) as Headers, + }); + triggerHeaders["Upstash-Workflow-Invoke"] = "true"; + + return { headers: triggerHeaders, contentType }; + } + + async submitStep({ context, body, headers }: SubmitStepParams) { + const newUrl = context.url.replace(/[^/]+$/, this.workflowId); + const result = await context.qstashClient.publish({ + headers, + method: "POST", + body, + url: newUrl, + }) as { messageId: string }; + return [result]; + } } diff --git a/src/qstash/headers.ts b/src/qstash/headers.ts new file mode 100644 index 0000000..01ea6b6 --- /dev/null +++ b/src/qstash/headers.ts @@ -0,0 +1,267 @@ +import { FlowControl, QstashError } from "@upstash/qstash"; +import { + DEFAULT_CONTENT_TYPE, + DEFAULT_RETRIES, + WORKFLOW_FAILURE_HEADER, + WORKFLOW_FEATURE_HEADER, + WORKFLOW_ID_HEADER, + WORKFLOW_INIT_HEADER, + WORKFLOW_INVOKE_COUNT_HEADER, + WORKFLOW_PROTOCOL_VERSION, + WORKFLOW_PROTOCOL_VERSION_HEADER, + WORKFLOW_URL_HEADER, +} from "../constants"; +import { BaseLazyStep, LazyCallStep } from "../context/steps"; +import { Step, Telemetry } from "../types"; +import { getTelemetryHeaders, HeadersResponse } from "../workflow-requests"; + +export type WorkflowConfig = { + retries?: number; + flowControl?: FlowControl; + failureUrl?: string; + telemetry?: Telemetry; + workflowRunId: string; + workflowUrl: string; + useJSONContent?: boolean; +}; + +/** + * groups the headers with respect to where they should be passed + */ +type WorkflowHeaderGroups = { + /** + * headers which will be returned as they are, without any prefix + */ + rawHeaders: Record; + /** + * headers which should be passed to the workflow endpoint + * + * will be prefixed with `Upstash-` or `Upstash-Callback` depending on the step + */ + workflowHeaders: Record; + /** + * Headers which should be passed to the failure URL + * + * will be prefixed with `Upstash-Failure-Callback-` + */ + failureHeaders: Record; +}; + +type StepInfo = { + step: Step; + lazyStep: BaseLazyStep; +}; + +type WorkflowHeaderParams = { + userHeaders: Headers; + workflowConfig: WorkflowConfig; + invokeCount?: number; + initHeaderValue: "true" | "false"; + stepInfo?: StepInfo; +}; + +class WorkflowHeaders { + private userHeaders: Headers; + private workflowConfig: WorkflowConfig; + private invokeCount?: number; + private initHeaderValue: "true" | "false"; + private stepInfo?: Required; + private headers: WorkflowHeaderGroups; + + constructor({ + userHeaders, + workflowConfig, + invokeCount, + initHeaderValue, + stepInfo, + }: WorkflowHeaderParams) { + this.userHeaders = userHeaders; + this.workflowConfig = workflowConfig; + this.invokeCount = invokeCount; + this.initHeaderValue = initHeaderValue; + this.stepInfo = stepInfo; + this.headers = { + rawHeaders: {}, + workflowHeaders: {}, + failureHeaders: {}, + }; + } + + getHeaders(): HeadersResponse { + this.addBaseHeaders(); + this.addRetries(); + this.addFlowControl(); + this.addUserHeaders(); + this.addInvokeCount(); + this.addFailureUrl(); + const contentType = this.addContentType(); + + return this.prefixHeaders(contentType); + } + + private addBaseHeaders() { + this.headers.rawHeaders = { + ...this.headers.rawHeaders, + [WORKFLOW_INIT_HEADER]: this.initHeaderValue, + [WORKFLOW_ID_HEADER]: this.workflowConfig.workflowRunId, + [WORKFLOW_URL_HEADER]: this.workflowConfig.workflowUrl, + [WORKFLOW_FEATURE_HEADER]: "LazyFetch,InitialBody", + [WORKFLOW_PROTOCOL_VERSION_HEADER]: WORKFLOW_PROTOCOL_VERSION, + ...(this.workflowConfig.telemetry ? getTelemetryHeaders(this.workflowConfig.telemetry) : {}), + }; + + if (this.stepInfo?.lazyStep.stepType !== "Call") { + this.headers.rawHeaders[`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`] = + WORKFLOW_PROTOCOL_VERSION; + } + } + + private addInvokeCount() { + if (this.invokeCount === undefined || this.invokeCount === 0) { + return; + } + const invokeCount = this.invokeCount.toString(); + + this.headers.workflowHeaders[`Forward-${WORKFLOW_INVOKE_COUNT_HEADER}`] = invokeCount; + if (this.workflowConfig.failureUrl) { + this.headers.failureHeaders[`Forward-${WORKFLOW_INVOKE_COUNT_HEADER}`] = invokeCount; + } + + // for context.call: + if (this.stepInfo?.lazyStep instanceof LazyCallStep) { + this.headers.rawHeaders[`Upstash-Forward-${WORKFLOW_INVOKE_COUNT_HEADER}`] = invokeCount; + } + } + + private addRetries() { + if ( + this.workflowConfig.retries === undefined || + this.workflowConfig.retries === DEFAULT_RETRIES + ) { + return; + } + + const retries = this.workflowConfig.retries.toString(); + + this.headers.workflowHeaders["Retries"] = retries; + if (this.workflowConfig.failureUrl) { + this.headers.failureHeaders["Retries"] = retries; + } + } + + private addFlowControl() { + if (!this.workflowConfig.flowControl) { + return; + } + const { flowControlKey, flowControlValue } = prepareFlowControl( + this.workflowConfig.flowControl + ); + + this.headers.workflowHeaders["Flow-Control-Key"] = flowControlKey; + this.headers.workflowHeaders["Flow-Control-Value"] = flowControlValue; + + if (this.workflowConfig.failureUrl) { + this.headers.failureHeaders["Flow-Control-Key"] = flowControlKey; + this.headers.failureHeaders["Flow-Control-Value"] = flowControlValue; + } + } + + private addUserHeaders() { + for (const [key, value] of this.userHeaders.entries()) { + const forwardKey = `Forward-${key}`; + this.headers.workflowHeaders[forwardKey] = value; + + if (this.workflowConfig.failureUrl) { + this.headers.failureHeaders[forwardKey] = value; + } + } + } + + private addFailureUrl() { + if (!this.workflowConfig.failureUrl) { + return; + } + + this.headers.workflowHeaders["Failure-Callback"] = this.workflowConfig.failureUrl; + + this.headers.failureHeaders[`Forward-${WORKFLOW_FAILURE_HEADER}`] = "true"; + this.headers.failureHeaders[`Forward-Upstash-Workflow-Failure-Callback`] = "true"; + this.headers.failureHeaders["Workflow-Runid"] = this.workflowConfig.workflowRunId; + this.headers.failureHeaders["Workflow-Init"] = "false"; + this.headers.failureHeaders["Workflow-Url"] = this.workflowConfig.workflowUrl; + this.headers.failureHeaders["Workflow-Calltype"] = "failureCall"; + this.headers.failureHeaders["Feature-Set"] = "LazyFetch,InitialBody"; + if ( + this.workflowConfig.retries !== undefined && + this.workflowConfig.retries !== DEFAULT_RETRIES + ) { + this.headers.failureHeaders["Retries"] = this.workflowConfig.retries.toString(); + } + } + + private addContentType() { + if (this.workflowConfig.useJSONContent) { + this.headers.rawHeaders["content-type"] = "application/json"; + return "application/json"; + } + + const callHeaders = new Headers( + this.stepInfo?.lazyStep instanceof LazyCallStep ? this.stepInfo.lazyStep.headers : {} + ); + const contentType = + (callHeaders.get("content-type") + ? callHeaders.get("content-type") + : this.userHeaders?.get("Content-Type") + ? this.userHeaders.get("Content-Type") + : undefined) ?? DEFAULT_CONTENT_TYPE; + this.headers.rawHeaders["content-type"] = contentType; + return contentType; + } + + private prefixHeaders(contentType: string): HeadersResponse { + const { rawHeaders, workflowHeaders, failureHeaders } = this.headers; + + const isCall = this.stepInfo?.lazyStep.stepType === "Call"; + return { + headers: { + ...rawHeaders, + ...addPrefixToHeaders(workflowHeaders, isCall ? "Upstash-Callback-" : "Upstash-"), + ...addPrefixToHeaders(failureHeaders, "Upstash-Failure-Callback-"), + ...(isCall ? addPrefixToHeaders(failureHeaders, "Upstash-Callback-Failure-Callback-") : {}), + }, + contentType, + }; + } +} + +function addPrefixToHeaders(headers: Record, prefix: string) { + const prefixedHeaders: Record = {}; + for (const [key, value] of Object.entries(headers)) { + prefixedHeaders[`${prefix}${key}`] = value; + } + return prefixedHeaders; +} + +export const prepareFlowControl = (flowControl: FlowControl) => { + const parallelism = flowControl.parallelism?.toString(); + const rate = flowControl.ratePerSecond?.toString(); + + const controlValue = [ + parallelism ? `parallelism=${parallelism}` : undefined, + rate ? `rate=${rate}` : undefined, + ].filter(Boolean); + + if (controlValue.length === 0) { + throw new QstashError("Provide at least one of parallelism or ratePerSecond for flowControl"); + } + + return { + flowControlKey: flowControl.key, + flowControlValue: controlValue.join(", "), + }; +}; + +export const getHeaders = (params: WorkflowHeaderParams) => { + const workflowHeaders = new WorkflowHeaders(params); + return workflowHeaders.getHeaders(); +}; diff --git a/src/qstash/submit-steps.ts b/src/qstash/submit-steps.ts new file mode 100644 index 0000000..43d4695 --- /dev/null +++ b/src/qstash/submit-steps.ts @@ -0,0 +1,134 @@ +import { NO_CONCURRENCY } from "../constants"; +import { WorkflowAbort } from "../error"; +import { WorkflowLogger } from "../logger"; +import { Telemetry } from "../types"; +import { WorkflowContext } from "../context"; +import { BaseLazyStep } from "../context/steps"; +import { getHeaders } from "./headers"; + +export const submitParallelSteps = async ({ + context, + steps, + initialStepCount, + invokeCount, + telemetry, + debug, +}: { + context: WorkflowContext; + steps: BaseLazyStep[]; + initialStepCount: number; + invokeCount: number; + telemetry?: Telemetry; + debug?: WorkflowLogger; +}) => { + const planSteps = steps.map((step, index) => + step.getPlanStep(steps.length, initialStepCount + index) + ); + + await debug?.log("SUBMIT", "SUBMIT_STEP", { + length: planSteps.length, + steps: planSteps, + }); + + const result = await context.qstashClient.batch( + planSteps.map((planStep) => { + const { headers } = getHeaders({ + initHeaderValue: "false", + workflowConfig: { + workflowRunId: context.workflowRunId, + workflowUrl: context.url, + failureUrl: context.failureUrl, + retries: context.retries, + flowControl: context.flowControl, + telemetry, + }, + userHeaders: context.headers, + invokeCount, + }); + + return { + headers, + method: "POST", + url: context.url, + body: JSON.stringify(planStep), + notBefore: planStep.sleepUntil, + delay: planStep.sleepFor, + }; + }) + ) as { messageId: string }[]; + + await debug?.log("INFO", "SUBMIT_STEP", { + messageIds: result.map((message) => { + return { + message: message.messageId, + }; + }), + }); + + + throw new WorkflowAbort(planSteps[0].stepName, planSteps[0]); +}; + +export const submitSingleStep = async ({ + context, + lazyStep, + stepId, + invokeCount, + concurrency, + telemetry, + debug, +}: { + context: WorkflowContext; + lazyStep: BaseLazyStep; + stepId: number; + invokeCount: number; + concurrency: number; + telemetry?: Telemetry; + debug?: WorkflowLogger; +}) => { + const resultStep = await lazyStep.getResultStep(concurrency, stepId); + await debug?.log("INFO", "RUN_SINGLE", { + fromRequest: false, + step: resultStep, + stepCount: stepId, + }); + + const { headers } = lazyStep.getHeaders({ + context, + step: resultStep, + invokeCount, + telemetry, + }); + const body = lazyStep.getBody({ + context, + step: resultStep, + headers, + invokeCount, + telemetry, + }); + + await debug?.log("SUBMIT", "SUBMIT_STEP", { + length: 1, + steps: [resultStep], + }); + + const submitResult = await lazyStep.submitStep({ + context, + body, + headers, + isParallel: concurrency !== NO_CONCURRENCY, + invokeCount, + step: resultStep, + telemetry, + }); + + await debug?.log("INFO", "SUBMIT_STEP", { + messageIds: submitResult.map((message) => { + return { + message: message.messageId, + }; + }), + }); + + return resultStep; +}; diff --git a/src/serve/authorization.test.ts b/src/serve/authorization.test.ts index d1038a4..b2b243e 100644 --- a/src/serve/authorization.test.ts +++ b/src/serve/authorization.test.ts @@ -214,11 +214,9 @@ describe("disabled workflow context", () => { "content-type": "application/json", "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-workflow-sdk-version": "1", "upstash-method": "POST", "upstash-retries": "0", - "upstash-failure-callback-retries": "0", "upstash-workflow-init": "false", "upstash-workflow-runid": "wfr-bar", "upstash-workflow-url": WORKFLOW_ENDPOINT, @@ -238,6 +236,7 @@ describe("disabled workflow context", () => { steps: [], url: WORKFLOW_ENDPOINT, initialPayload: "my-payload", + retries: 0, }); let called = false; @@ -271,11 +270,9 @@ describe("disabled workflow context", () => { "content-type": "application/json", "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-workflow-sdk-version": "1", "upstash-method": "POST", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", + "upstash-retries": "0", "upstash-workflow-init": "false", "upstash-workflow-runid": "wfr-bar", "upstash-workflow-url": WORKFLOW_ENDPOINT, @@ -295,6 +292,7 @@ describe("disabled workflow context", () => { steps: [], url: WORKFLOW_ENDPOINT, initialPayload: "my-payload", + invokeCount: 4, }); let called = false; @@ -329,11 +327,9 @@ describe("disabled workflow context", () => { "content-type": "application/json", "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-upstash-workflow-invoke-count": "0", + "upstash-forward-upstash-workflow-invoke-count": "4", "upstash-workflow-sdk-version": "1", "upstash-method": "POST", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", "upstash-workflow-init": "false", "upstash-workflow-runid": "wfr-bar", "upstash-workflow-url": WORKFLOW_ENDPOINT, diff --git a/src/serve/serve-many.test.ts b/src/serve/serve-many.test.ts index bb8d3ce..a2a234d 100644 --- a/src/serve/serve-many.test.ts +++ b/src/serve/serve-many.test.ts @@ -10,98 +10,9 @@ import { } from "../test-utils"; import { nanoid } from "../utils"; import { WORKFLOW_INVOKE_COUNT_HEADER } from "../constants"; -import { Telemetry } from "../types"; -import { getNewUrlFromWorkflowId, invokeWorkflow } from "./serve-many"; +import { getNewUrlFromWorkflowId } from "./serve-many"; describe("serveMany", () => { - describe("invokeWorkflow", () => { - test("should call invokeWorkflow", async () => { - const token = nanoid(); - - const telemetry: Telemetry = { - sdk: "sdk", - framework: "framework", - runtime: "runtime", - }; - const workflowId = "some-workflow-id"; - - await mockQStashServer({ - execute: async () => { - await invokeWorkflow({ - settings: { - body: "some-body", - workflow: { - routeFunction: async () => {}, - workflowId, - options: {}, - }, - headers: { custom: "custom-header-value" }, - retries: 6, - workflowRunId: "some-run-id", - }, - invokeCount: 0, - invokeStep: { - stepId: 4, - concurrent: 1, - stepName: "invoke-step", - stepType: "Invoke", - }, - context: new WorkflowContext({ - headers: new Headers({ original: "original-headers-value" }) as Headers, - initialPayload: "initial-payload", - qstashClient: new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token }), - steps: [], - url: `${WORKFLOW_ENDPOINT}/original_workflow`, - workflowRunId: "wfr_original_workflow", - }), - telemetry, - }); - }, - responseFields: { body: "msgId", status: 200 }, - receivesRequest: { - method: "POST", - url: `${MOCK_QSTASH_SERVER_URL}/v2/publish/${WORKFLOW_ENDPOINT}/${workflowId}`, - token, - body: { - body: '"some-body"', - headers: { - "Upstash-Workflow-Init": ["false"], - "Upstash-Workflow-RunId": ["wfr_original_workflow"], - "Upstash-Workflow-Url": ["https://requestcatcher.com/api/original_workflow"], - "Upstash-Forward-Upstash-Workflow-Invoke-Count": ["0"], - "Upstash-Feature-Set": ["LazyFetch,InitialBody"], - "Upstash-Workflow-Sdk-Version": ["1"], - "content-type": ["application/json"], - "Upstash-Telemetry-Sdk": ["sdk"], - "Upstash-Telemetry-Framework": ["framework"], - "Upstash-Telemetry-Runtime": ["runtime"], - "Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"], - "Upstash-Retries": ["3"], - "Upstash-Failure-Callback-Retries": ["3"], - "Upstash-Forward-original": ["original-headers-value"], - "Upstash-Failure-Callback-Forward-original": ["original-headers-value"], - "Upstash-Workflow-Runid": ["wfr_original_workflow"], - }, - workflowRunId: "wfr_original_workflow", - workflowUrl: "https://requestcatcher.com/api/original_workflow", - step: { - stepId: 4, - concurrent: 1, - stepName: "invoke-step", - stepType: "Invoke", - }, - }, - headers: { - "upstash-retries": "6", - [`Upstash-Forward-${WORKFLOW_INVOKE_COUNT_HEADER}`]: "1", - [`Upstash-Forward-custom`]: "custom-header-value", - "Upstash-Forward-original": null, - }, - }, - }); - }); - }); - describe("serveMany", () => { test("should throw if workflowId contains '/'", () => { const throws = () => @@ -243,13 +154,10 @@ describe("serveMany", () => { body: { body: "2", headers: { - "Upstash-Failure-Callback-Retries": ["3"], - "Upstash-Forward-Upstash-Workflow-Invoke-Count": ["0"], "Upstash-Feature-Set": ["LazyFetch,InitialBody"], "Upstash-Flow-Control-Key": ["workflowTwoFlowControl"], "Upstash-Flow-Control-Value": ["parallelism=4, rate=6"], "Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"], - "Upstash-Retries": ["3"], "Upstash-Telemetry-Framework": ["nextjs"], "Upstash-Telemetry-Runtime": ["node@v22.6.0"], "Upstash-Telemetry-Sdk": ["@upstash/workflow@v0.2.7"], @@ -300,13 +208,11 @@ describe("serveMany", () => { body: { body: "2", headers: { - "Upstash-Failure-Callback-Retries": ["3"], "Upstash-Feature-Set": ["LazyFetch,InitialBody"], "Upstash-Forward-Upstash-Workflow-Invoke-Count": ["1"], "Upstash-Flow-Control-Key": ["workflowTwoFlowControl"], "Upstash-Flow-Control-Value": ["parallelism=4, rate=6"], "Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"], - "Upstash-Retries": ["3"], "Upstash-Telemetry-Framework": ["nextjs"], "Upstash-Telemetry-Runtime": ["node@v22.6.0"], "Upstash-Telemetry-Sdk": ["@upstash/workflow@v0.2.7"], @@ -373,12 +279,11 @@ describe("serveMany", () => { "upstash-callback-forward-upstash-workflow-stepid": "1", "upstash-callback-forward-upstash-workflow-stepname": "call other workflow", "upstash-callback-forward-upstash-workflow-steptype": "Call", - "upstash-callback-retries": "3", "upstash-callback-workflow-calltype": "fromCallback", "upstash-callback-workflow-init": "false", "upstash-callback-workflow-runid": "wfr_id", "upstash-callback-workflow-url": "https://requestcatcher.com/api/workflow-three", - "upstash-failure-callback-retries": "3", + "upstash-forward-upstash-workflow-invoke-count": "1", "upstash-feature-set": "WF_NoDelete,InitialBody", "upstash-method": "POST", "upstash-retries": "0", diff --git a/src/serve/serve-many.ts b/src/serve/serve-many.ts index 7ffaf1d..a111af3 100644 --- a/src/serve/serve-many.ts +++ b/src/serve/serve-many.ts @@ -1,17 +1,6 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { WorkflowContext } from "../context"; import { WorkflowError } from "../error"; -import { - InvokableWorkflow, - InvokeWorkflowRequest, - LazyInvokeStepParams, - PublicServeOptions, - RouteFunction, - Step, - Telemetry, -} from "../types"; -import { getWorkflowRunId } from "../utils"; -import { getHeaders } from "../workflow-requests"; +import { InvokableWorkflow, PublicServeOptions, RouteFunction } from "../types"; export type OmitOptionsInServeMany = Omit< TOptions, @@ -102,90 +91,6 @@ export const serveManyBase = < }; }; -export const invokeWorkflow = async ({ - settings, - invokeStep, - context, - invokeCount, - telemetry, -}: { - settings: LazyInvokeStepParams; - invokeStep: Step; - context: WorkflowContext; - invokeCount: number; - telemetry?: Telemetry; -}) => { - const { - body, - workflow, - headers = {}, - workflowRunId = getWorkflowRunId(), - retries, - flowControl, - } = settings; - const { workflowId } = workflow; - - const { - retries: workflowRetries, - failureFunction, - failureUrl, - useJSONContent, - flowControl: workflowFlowControl, - } = workflow.options; - - if (!workflowId) { - throw new WorkflowError("You can only invoke workflow which has a workflowId"); - } - - const { headers: invokerHeaders } = getHeaders({ - initHeaderValue: "false", - workflowRunId: context.workflowRunId, - workflowUrl: context.url, - userHeaders: context.headers, - failureUrl: context.failureUrl, - retries: context.retries, - telemetry, - invokeCount, - flowControl: context.flowControl, - }); - invokerHeaders["Upstash-Workflow-Runid"] = context.workflowRunId; - - const newUrl = getNewUrlFromWorkflowId(context.url, workflowId); - - const { headers: triggerHeaders } = getHeaders({ - initHeaderValue: "true", - workflowRunId, - workflowUrl: newUrl, - userHeaders: new Headers(headers) as Headers, - retries: retries ?? workflowRetries, - telemetry, - failureUrl: failureFunction ? newUrl : failureUrl, - invokeCount: invokeCount + 1, - flowControl: flowControl ?? workflowFlowControl, - }); - triggerHeaders["Upstash-Workflow-Invoke"] = "true"; - if (useJSONContent) { - triggerHeaders["content-type"] = "application/json"; - } - - const request: InvokeWorkflowRequest = { - body: JSON.stringify(body), - headers: Object.fromEntries( - Object.entries(invokerHeaders).map((pairs) => [pairs[0], [pairs[1]]]) - ), - workflowRunId: context.workflowRunId, - workflowUrl: context.url, - step: invokeStep, - }; - - await context.qstashClient.publish({ - headers: triggerHeaders, - method: "POST", - body: JSON.stringify(request), - url: newUrl, - }); -}; - export const getNewUrlFromWorkflowId = (url: string, workflowId?: string) => { if (!workflowId) { throw new WorkflowError("You can only call workflow which has a workflowId"); diff --git a/src/serve/serve.test.ts b/src/serve/serve.test.ts index 94d9d8b..ed7c446 100644 --- a/src/serve/serve.test.ts +++ b/src/serve/serve.test.ts @@ -75,7 +75,6 @@ describe("serve", () => { [WORKFLOW_INIT_HEADER]: "true", [WORKFLOW_PROTOCOL_VERSION_HEADER]: "1", [`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: "1", - "upstash-failure-callback-retries": "1", "upstash-retries": "1", "Upstash-Flow-Control-Key": "my-key", "Upstash-Flow-Control-Value": "parallelism=1", @@ -162,8 +161,6 @@ describe("serve", () => { "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-forward-upstash-workflow-invoke-count": "2", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", "upstash-method": "POST", "upstash-workflow-init": "true", "upstash-workflow-url": WORKFLOW_ENDPOINT, @@ -192,8 +189,6 @@ describe("serve", () => { "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-forward-upstash-workflow-invoke-count": "2", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", "upstash-method": "POST", "upstash-workflow-runid": workflowRunId, "upstash-workflow-init": "false", @@ -225,8 +220,6 @@ describe("serve", () => { "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-forward-upstash-workflow-invoke-count": "2", "upstash-method": "POST", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", "upstash-workflow-runid": workflowRunId, "upstash-workflow-init": "false", "upstash-workflow-url": WORKFLOW_ENDPOINT, @@ -410,10 +403,7 @@ describe("serve", () => { "content-type": "application/json", "upstash-feature-set": "LazyFetch,InitialBody", "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-method": "POST", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", "upstash-workflow-init": "false", "upstash-workflow-runid": "wfr-foo", "upstash-workflow-url": WORKFLOW_ENDPOINT, @@ -462,10 +452,7 @@ describe("serve", () => { "upstash-feature-set": "LazyFetch,InitialBody", "upstash-delay": "1s", "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-method": "POST", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", "upstash-workflow-init": "false", "upstash-workflow-runid": "wfr-bar", "upstash-workflow-url": WORKFLOW_ENDPOINT, @@ -508,16 +495,14 @@ describe("serve", () => { "upstash-workflow-sdk-version": "1", "content-type": "application/json", "upstash-feature-set": "LazyFetch,InitialBody", + "upstash-failure-callback-feature-set": "LazyFetch,InitialBody", "upstash-delay": "1s", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", "upstash-workflow-init": "false", "upstash-workflow-runid": "wfr-bar", "upstash-workflow-url": WORKFLOW_ENDPOINT, "upstash-failure-callback": myFailureEndpoint, - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-failure-callback-forward-upstash-workflow-is-failure": "true", "upstash-failure-callback-forward-upstash-workflow-failure-callback": "true", "upstash-failure-callback-workflow-calltype": "failureCall", @@ -565,11 +550,10 @@ describe("serve", () => { "upstash-workflow-sdk-version": "1", "content-type": "application/json", "upstash-feature-set": "LazyFetch,InitialBody", + "upstash-failure-callback-feature-set": "LazyFetch,InitialBody", "upstash-delay": "1s", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", "upstash-workflow-init": "false", "upstash-workflow-runid": "wfr-bar", "upstash-workflow-url": WORKFLOW_ENDPOINT, @@ -580,7 +564,6 @@ describe("serve", () => { "upstash-failure-callback-workflow-init": "false", "upstash-failure-callback-workflow-runid": "wfr-bar", "upstash-failure-callback-workflow-url": "https://requestcatcher.com/api", - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-telemetry-framework": "unknown", "upstash-telemetry-runtime": "unknown", "upstash-telemetry-sdk": "@upstash/workflow@v0.2.7", @@ -592,6 +575,107 @@ describe("serve", () => { }); expect(called).toBeTrue(); }); + + test("should set failure headers in context call", async () => { + const request = getRequest(WORKFLOW_ENDPOINT, "wfr-foo", "my-payload", []); + request.headers.set(WORKFLOW_INVOKE_COUNT_HEADER, "2"); + let called = false; + const myFailureFunction: WorkflowServeOptions["failureFunction"] = async () => { + return; + }; + + const routeFunction: RouteFunction = async (context) => { + await context.call("call step", { + url: "some-url", + retries: 4, + timeout: 10, + headers: { + test: "headers", + }, + body: "body", + method: "PATCH", + }); + }; + + const { handler: endpoint } = serve(routeFunction, { + qstashClient, + receiver: undefined, + failureUrl: "some-failure-url", + retries: 2, + }); + await mockQStashServer({ + execute: async () => { + const response = await endpoint(request); + expect(response.status).toBe(200); + called = true; + }, + responseFields: { body: { messageId: "some-message-id" }, status: 200 }, + receivesRequest: { + method: "POST", + url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`, + token, + body: [ + { + destination: "some-url", + headers: { + "content-type": "application/json", + "upstash-callback": "https://requestcatcher.com/api", + "upstash-callback-failure-callback-feature-set": "LazyFetch,InitialBody", + "upstash-callback-failure-callback": "some-failure-url", + "upstash-callback-failure-callback-forward-upstash-workflow-failure-callback": + "true", + "upstash-callback-failure-callback-forward-upstash-workflow-is-failure": "true", + "upstash-callback-failure-callback-retries": "2", + "upstash-callback-failure-callback-workflow-calltype": "failureCall", + "upstash-callback-failure-callback-workflow-init": "false", + "upstash-callback-failure-callback-workflow-runid": "wfr-foo", + "upstash-callback-failure-callback-workflow-url": "https://requestcatcher.com/api", + "upstash-callback-forward-upstash-workflow-callback": "true", + "upstash-callback-feature-set": "LazyFetch,InitialBody", + // invoke counts: + "upstash-callback-failure-callback-forward-upstash-workflow-invoke-count": "2", + "upstash-failure-callback-forward-upstash-workflow-invoke-count": "2", + "upstash-callback-forward-upstash-workflow-invoke-count": "2", + "upstash-forward-upstash-workflow-invoke-count": "2", + "upstash-callback-forward-upstash-workflow-concurrent": "1", + "upstash-callback-forward-upstash-workflow-contenttype": "application/json", + "upstash-callback-forward-upstash-workflow-stepid": "1", + "upstash-callback-forward-upstash-workflow-stepname": "call step", + "upstash-callback-forward-upstash-workflow-steptype": "Call", + "upstash-callback-retries": "2", + "upstash-callback-workflow-calltype": "fromCallback", + "upstash-callback-workflow-init": "false", + "upstash-callback-workflow-runid": "wfr-foo", + "upstash-callback-workflow-url": "https://requestcatcher.com/api", + "upstash-failure-callback-feature-set": "LazyFetch,InitialBody", + "upstash-failure-callback-forward-upstash-workflow-failure-callback": "true", + "upstash-failure-callback-forward-upstash-workflow-is-failure": "true", + "upstash-failure-callback-retries": "2", + "upstash-failure-callback-workflow-calltype": "failureCall", + "upstash-failure-callback-workflow-init": "false", + "upstash-failure-callback-workflow-runid": "wfr-foo", + "upstash-failure-callback-workflow-url": "https://requestcatcher.com/api", + "upstash-feature-set": "WF_NoDelete,InitialBody", + "upstash-forward-test": "headers", + "upstash-method": "PATCH", + "upstash-retries": "4", + "upstash-telemetry-framework": "unknown", + "upstash-telemetry-runtime": "unknown", + "upstash-telemetry-sdk": "@upstash/workflow@v0.2.7", + "upstash-timeout": "10", + "upstash-workflow-calltype": "toCallback", + "upstash-workflow-init": "false", + "upstash-workflow-runid": "wfr-foo", + "upstash-workflow-sdk-version": "1", + "upstash-workflow-url": "https://requestcatcher.com/api", + }, + body: '"body"', + }, + ], + }, + }); + expect(called).toBeTrue(); + }); }); describe("should replace baseUrl correctly", () => { @@ -749,6 +833,11 @@ describe("serve", () => { { qstashClient, receiver: undefined, + retries: 2, + flowControl: { + key: "fc-key", + ratePerSecond: 2, + }, } ); let called = false; @@ -776,9 +865,9 @@ describe("serve", () => { "content-type": ["application/json"], "Upstash-Feature-Set": ["LazyFetch,InitialBody"], "Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"], - "Upstash-Forward-Upstash-Workflow-Invoke-Count": ["0"], - "Upstash-Retries": ["3"], - "Upstash-Failure-Callback-Retries": ["3"], + "Upstash-Retries": ["2"], + "Upstash-Flow-Control-Key": ["fc-key"], + "Upstash-Flow-Control-Value": ["rate=2"], "Upstash-Workflow-CallType": ["step"], "Upstash-Workflow-Init": ["false"], "Upstash-Workflow-RunId": ["wfr-bar"], @@ -1052,17 +1141,15 @@ describe("serve", () => { "upstash-workflow-sdk-version": "1", "content-type": "application/json", "upstash-feature-set": "LazyFetch,InitialBody", + "upstash-failure-callback-feature-set": "LazyFetch,InitialBody", "upstash-delay": "1s", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", - "upstash-retries": "3", - "upstash-failure-callback-retries": "3", "upstash-failure-callback-forward-upstash-workflow-failure-callback": "true", "upstash-failure-callback-workflow-calltype": "failureCall", "upstash-failure-callback-workflow-init": "false", "upstash-failure-callback-workflow-runid": "wfr-bar", "upstash-failure-callback-workflow-url": "https://requestcatcher.com/api", - "upstash-forward-upstash-workflow-invoke-count": "0", "upstash-workflow-init": "false", "upstash-workflow-runid": "wfr-bar", "upstash-workflow-url": WORKFLOW_ENDPOINT, diff --git a/src/workflow-requests.test.ts b/src/workflow-requests.test.ts index 8ec94de..1baf733 100644 --- a/src/workflow-requests.test.ts +++ b/src/workflow-requests.test.ts @@ -3,7 +3,6 @@ import { afterAll, describe, expect, spyOn, test } from "bun:test"; import { nanoid } from "./utils"; import { - getHeaders, handleThirdPartyCallResult, recreateUserHeaders, triggerFirstInvocation, @@ -32,6 +31,8 @@ import { } from "./test-utils"; import { WorkflowLogger } from "./logger"; import { FinishState } from "./integration.test"; +import { getHeaders } from "./qstash/headers"; +import { LazyCallStep, LazyFunctionStep, LazyWaitForEventStep } from "./context/steps"; describe("Workflow Requests", () => { test("should send first invocation request", async () => { @@ -305,6 +306,7 @@ describe("Workflow Requests", () => { }, headers: { "upstash-retries": "2", + "upstash-failure-callback": WORKFLOW_ENDPOINT, }, }, }); @@ -443,14 +445,17 @@ describe("Workflow Requests", () => { describe("getHeaders", () => { const workflowRunId = nanoid(); test("should create headers without step passed", () => { - const { headers, timeoutHeaders } = getHeaders({ + const { headers } = getHeaders({ initHeaderValue: "true", - workflowRunId, - workflowUrl: WORKFLOW_ENDPOINT, - flowControl: { - key: "initial-key", - parallelism: 2, + workflowConfig: { + workflowRunId, + workflowUrl: WORKFLOW_ENDPOINT, + flowControl: { + key: "initial-key", + parallelism: 2, + }, }, + userHeaders: new Headers() as Headers, }); expect(headers).toEqual({ [WORKFLOW_INIT_HEADER]: "true", @@ -463,29 +468,30 @@ describe("Workflow Requests", () => { "Upstash-Flow-Control-Key": "initial-key", "Upstash-Flow-Control-Value": "parallelism=2", }); - expect(timeoutHeaders).toBeUndefined(); }); - test("should create headers with a result step", () => { + test("should create headers with a result step", async () => { const stepId = 3; const stepName = "some step"; - const stepType: StepType = "Run"; - const { headers, timeoutHeaders } = getHeaders({ + const lazyStep = new LazyFunctionStep(stepName, () => {}); + const { headers } = getHeaders({ initHeaderValue: "false", - workflowRunId, - workflowUrl: WORKFLOW_ENDPOINT, - step: { - stepId, - stepName, - stepType: stepType, - concurrent: 1, + workflowConfig: { + workflowRunId, + workflowUrl: WORKFLOW_ENDPOINT, + flowControl: { + key: "step-key", + ratePerSecond: 3, + }, }, - flowControl: { - key: "step-key", - ratePerSecond: 3, + stepInfo: { + step: await lazyStep.getResultStep(1, stepId), + lazyStep, }, + userHeaders: new Headers() as Headers, }); + expect(headers).toEqual({ [WORKFLOW_INIT_HEADER]: "false", [WORKFLOW_ID_HEADER]: workflowRunId, @@ -497,13 +503,11 @@ describe("Workflow Requests", () => { "Upstash-Flow-Control-Key": "step-key", "Upstash-Flow-Control-Value": "rate=3", }); - expect(timeoutHeaders).toBeUndefined(); }); - test("should create headers with a call step", () => { + test("should create headers with a call step", async () => { const stepId = 3; const stepName = "some step"; - const stepType: StepType = "Call"; const callUrl = "https://www.some-call-endpoint.com/api"; const callMethod = "GET"; const callHeaders = { @@ -511,31 +515,36 @@ describe("Workflow Requests", () => { }; const callBody = undefined; - const { headers, timeoutHeaders } = getHeaders({ - initHeaderValue: "false", - workflowRunId, - workflowUrl: WORKFLOW_ENDPOINT, - step: { - stepId, - stepName, - stepType: stepType, - concurrent: 1, - callUrl, - callMethod, - callHeaders, - callBody, - }, - invokeCount: 3, - flowControl: { - key: "regular-flow-key", - ratePerSecond: 3, - parallelism: 4, - }, - callFlowControl: { + const lazyStep = new LazyCallStep( + stepName, + callUrl, + callMethod, + callBody, + callHeaders, + 0, + undefined, + { key: "call-flow-key", ratePerSecond: 5, parallelism: 6, - }, + } + ); + const { headers } = lazyStep.getHeaders({ + context: new WorkflowContext({ + qstashClient: new Client({ baseUrl: MOCK_SERVER_URL, token: "myToken" }), + workflowRunId, + headers: new Headers() as Headers, + steps: [], + url: WORKFLOW_ENDPOINT, + initialPayload: undefined, + flowControl: { + key: "regular-flow-key", + ratePerSecond: 3, + parallelism: 4, + }, + }), + invokeCount: 3, + step: await lazyStep.getResultStep(1, stepId), }); expect(headers).toEqual({ [WORKFLOW_INIT_HEADER]: "false", @@ -544,6 +553,7 @@ describe("Workflow Requests", () => { [WORKFLOW_FEATURE_HEADER]: "WF_NoDelete,InitialBody", [WORKFLOW_PROTOCOL_VERSION_HEADER]: WORKFLOW_PROTOCOL_VERSION, "Upstash-Callback-Forward-Upstash-Workflow-Invoke-Count": "3", + "Upstash-Forward-Upstash-Workflow-Invoke-Count": "3", "Upstash-Callback-Feature-Set": "LazyFetch,InitialBody", "Upstash-Retries": "0", "Upstash-Callback": WORKFLOW_ENDPOINT, @@ -560,32 +570,36 @@ describe("Workflow Requests", () => { "Upstash-Forward-my-custom-header": "my-custom-header-value", "Upstash-Workflow-CallType": "toCallback", "content-type": "application/json", + // flow control: "Upstash-Callback-Flow-Control-Key": "regular-flow-key", "Upstash-Callback-Flow-Control-Value": "parallelism=4, rate=3", "Upstash-Flow-Control-Key": "call-flow-key", "Upstash-Flow-Control-Value": "parallelism=6, rate=5", }); - expect(timeoutHeaders).toBeUndefined(); }); test("should include failure header", () => { const failureUrl = "https://my-failure-endpoint.com"; - const { headers, timeoutHeaders } = getHeaders({ + const { headers } = getHeaders({ initHeaderValue: "true", - workflowRunId, - workflowUrl: WORKFLOW_ENDPOINT, - userHeaders: new Headers() as Headers, - failureUrl, - flowControl: { - key: "failure-key", - parallelism: 2, + workflowConfig: { + workflowRunId, + workflowUrl: WORKFLOW_ENDPOINT, + failureUrl, + flowControl: { + key: "failure-key", + parallelism: 2, + }, + retries: 6, }, + userHeaders: new Headers() as Headers, }); expect(headers).toEqual({ [WORKFLOW_INIT_HEADER]: "true", [WORKFLOW_ID_HEADER]: workflowRunId, [WORKFLOW_URL_HEADER]: WORKFLOW_ENDPOINT, [WORKFLOW_FEATURE_HEADER]: "LazyFetch,InitialBody", + "Upstash-Failure-Callback-Feature-Set": "LazyFetch,InitialBody", [WORKFLOW_PROTOCOL_VERSION_HEADER]: WORKFLOW_PROTOCOL_VERSION, [`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: WORKFLOW_PROTOCOL_VERSION, [`Upstash-Failure-Callback-Forward-${WORKFLOW_FAILURE_HEADER}`]: "true", @@ -600,28 +614,38 @@ describe("Workflow Requests", () => { "Upstash-Failure-Callback-Flow-Control-Value": "parallelism=2", "Upstash-Flow-Control-Key": "failure-key", "Upstash-Flow-Control-Value": "parallelism=2", + "Upstash-Failure-Callback-Retries": "6", + "Upstash-Retries": "6", }); - expect(timeoutHeaders).toBeUndefined(); }); - test("should return timeout headers for wait step", () => { - const { headers, timeoutHeaders } = getHeaders({ - initHeaderValue: "false", + test("should return timeout headers for wait step", async () => { + const lazyStep = new LazyWaitForEventStep("waiting-step-name", "wait event id", "20s"); + + const step = await lazyStep.getResultStep(1, 1); + const context = new WorkflowContext({ + headers: new Headers() as Headers, + initialPayload: undefined, + qstashClient: new Client({ baseUrl: MOCK_SERVER_URL, token: "token" }), + steps: [], + url: WORKFLOW_ENDPOINT, workflowRunId, - workflowUrl: WORKFLOW_ENDPOINT, - step: { - stepId: 1, - stepName: "waiting-step-name", - stepType: "Wait", - concurrent: 1, - waitEventId: "wait event id", - timeout: "20s", - }, flowControl: { key: "wait-key", parallelism: 2, }, }); + const { headers } = lazyStep.getHeaders({ + context, + step, + invokeCount: 0, + }); + const body = lazyStep.getBody({ + context, + headers, + invokeCount: 0, + step, + }); expect(headers).toEqual({ "Upstash-Workflow-Init": "false", "Upstash-Workflow-RunId": workflowRunId, @@ -634,18 +658,25 @@ describe("Workflow Requests", () => { "Upstash-Flow-Control-Key": "wait-key", "Upstash-Flow-Control-Value": "parallelism=2", }); - expect(timeoutHeaders).toEqual({ - "Upstash-Workflow-Init": ["false"], - "Upstash-Workflow-RunId": [workflowRunId], - "Upstash-Workflow-Url": [WORKFLOW_ENDPOINT], - [WORKFLOW_PROTOCOL_VERSION_HEADER]: [WORKFLOW_PROTOCOL_VERSION], - [WORKFLOW_FEATURE_HEADER]: ["LazyFetch,InitialBody"], - "Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"], - "Upstash-Workflow-Runid": [workflowRunId], - "Upstash-Workflow-CallType": ["step"], - "content-type": ["application/json"], - "Upstash-Flow-Control-Key": ["wait-key"], - "Upstash-Flow-Control-Value": ["parallelism=2"], + expect(typeof body).toBe("string"); + expect(JSON.parse(body)).toEqual({ + url: "https://requestcatcher.com/api", + timeout: "20s", + timeoutUrl: "https://requestcatcher.com/api", + timeoutHeaders: { + "Upstash-Workflow-Init": ["false"], + "Upstash-Workflow-RunId": [workflowRunId], + "Upstash-Workflow-Url": [WORKFLOW_ENDPOINT], + [WORKFLOW_FEATURE_HEADER]: ["LazyFetch,InitialBody"], + [WORKFLOW_PROTOCOL_VERSION_HEADER]: [WORKFLOW_PROTOCOL_VERSION], + "Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"], + "content-type": ["application/json"], + "Upstash-Flow-Control-Key": ["wait-key"], + "Upstash-Flow-Control-Value": ["parallelism=2"], + "Upstash-Workflow-CallType": ["step"], + "Upstash-Workflow-Runid": [workflowRunId], + }, + step: { stepId: 1, stepType: "Wait", stepName: "waiting-step-name", concurrent: 1 }, }); }); }); @@ -923,7 +954,6 @@ describe("Workflow Requests", () => { [WORKFLOW_PROTOCOL_VERSION_HEADER]: WORKFLOW_PROTOCOL_VERSION, "Upstash-Forward-Upstash-Workflow-Sdk-Version": "1", "Upstash-Retries": "0", - "Upstash-Failure-Callback-Retries": "0", "content-type": "application/json", }, requestPayload: undefined, diff --git a/src/workflow-requests.ts b/src/workflow-requests.ts index 01a2661..e2d253e 100644 --- a/src/workflow-requests.ts +++ b/src/workflow-requests.ts @@ -3,22 +3,14 @@ import { err, ok } from "neverthrow"; import { WorkflowAbort, WorkflowError } from "./error"; import type { WorkflowContext } from "./context"; import { - DEFAULT_CONTENT_TYPE, TELEMETRY_HEADER_FRAMEWORK, TELEMETRY_HEADER_RUNTIME, TELEMETRY_HEADER_SDK, - WORKFLOW_FAILURE_HEADER, - WORKFLOW_FEATURE_HEADER, WORKFLOW_ID_HEADER, - WORKFLOW_INIT_HEADER, WORKFLOW_INVOKE_COUNT_HEADER, - WORKFLOW_PROTOCOL_VERSION, - WORKFLOW_PROTOCOL_VERSION_HEADER, - WORKFLOW_URL_HEADER, } from "./constants"; import type { CallResponse, - HeaderParams, Step, StepType, Telemetry, @@ -30,6 +22,7 @@ import { StepTypes } from "./types"; import type { WorkflowLogger } from "./logger"; import { FlowControl, QstashError } from "@upstash/qstash"; import { getSteps } from "./client/utils"; +import { getHeaders } from "./qstash/headers"; export const triggerFirstInvocation = async ({ workflowContext, @@ -46,14 +39,17 @@ export const triggerFirstInvocation = async ({ }): Promise | Err> => { const { headers } = getHeaders({ initHeaderValue: "true", - workflowRunId: workflowContext.workflowRunId, - workflowUrl: workflowContext.url, + workflowConfig: { + workflowRunId: workflowContext.workflowRunId, + workflowUrl: workflowContext.url, + failureUrl: workflowContext.failureUrl, + retries: workflowContext.retries, + telemetry, + flowControl: workflowContext.flowControl, + useJSONContent: useJSONContent ?? false, + }, + invokeCount: invokeCount ?? 0, userHeaders: workflowContext.headers, - failureUrl: workflowContext.failureUrl, - retries: workflowContext.retries, - telemetry, - invokeCount, - flowControl: workflowContext.flowControl, }); // QStash doesn't forward content-type when passed in `upstash-forward-content-type` @@ -337,14 +333,16 @@ export const handleThirdPartyCallResult = async ({ const userHeaders = recreateUserHeaders(request.headers as Headers); const { headers: requestHeaders } = getHeaders({ initHeaderValue: "false", - workflowRunId, - workflowUrl, + workflowConfig: { + workflowRunId, + workflowUrl, + failureUrl, + retries, + telemetry, + flowControl, + }, userHeaders, - failureUrl, - retries, - telemetry, invokeCount: Number(invokeCount), - flowControl, }); const callResponse: CallResponse = { @@ -391,7 +389,7 @@ export const handleThirdPartyCallResult = async ({ export type HeadersResponse = { headers: Record; - timeoutHeaders?: Record; + contentType: string; }; export const getTelemetryHeaders = (telemetry: Telemetry) => { @@ -402,194 +400,6 @@ export const getTelemetryHeaders = (telemetry: Telemetry) => { }; }; -/** - * Gets headers for calling QStash - * - * See HeaderParams for more details about parameters. - * - * @returns headers to submit - */ -export const getHeaders = ({ - initHeaderValue, - workflowRunId, - workflowUrl, - userHeaders, - failureUrl, - retries, - step, - callRetries, - callTimeout, - telemetry, - invokeCount, - flowControl, - callFlowControl, -}: HeaderParams): HeadersResponse => { - const callHeaders = new Headers(step?.callHeaders); - const contentType = - (callHeaders.get("content-type") - ? callHeaders.get("content-type") - : userHeaders?.get("Content-Type") - ? userHeaders.get("Content-Type") - : undefined) ?? DEFAULT_CONTENT_TYPE; - - const baseHeaders: Record = { - [WORKFLOW_INIT_HEADER]: initHeaderValue, - [WORKFLOW_ID_HEADER]: workflowRunId, - [WORKFLOW_URL_HEADER]: workflowUrl, - [WORKFLOW_FEATURE_HEADER]: "LazyFetch,InitialBody", - [WORKFLOW_PROTOCOL_VERSION_HEADER]: WORKFLOW_PROTOCOL_VERSION, - "content-type": contentType, - ...(telemetry ? getTelemetryHeaders(telemetry) : {}), - }; - - if (invokeCount !== undefined && !step?.callUrl) { - baseHeaders[`Upstash-Forward-${WORKFLOW_INVOKE_COUNT_HEADER}`] = invokeCount.toString(); - } - - if (!step?.callUrl) { - baseHeaders[`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`] = WORKFLOW_PROTOCOL_VERSION; - } - if (callTimeout) { - baseHeaders[`Upstash-Timeout`] = callTimeout.toString(); - } - - if (failureUrl) { - baseHeaders[`Upstash-Failure-Callback-Forward-${WORKFLOW_FAILURE_HEADER}`] = "true"; - baseHeaders[`Upstash-Failure-Callback-Forward-Upstash-Workflow-Failure-Callback`] = "true"; - baseHeaders["Upstash-Failure-Callback-Workflow-Runid"] = workflowRunId; - baseHeaders["Upstash-Failure-Callback-Workflow-Init"] = "false"; - baseHeaders["Upstash-Failure-Callback-Workflow-Url"] = workflowUrl; - baseHeaders["Upstash-Failure-Callback-Workflow-Calltype"] = "failureCall"; - if (retries !== undefined) { - baseHeaders["Upstash-Failure-Callback-Retries"] = retries.toString(); - } - - if (flowControl) { - const { flowControlKey, flowControlValue } = prepareFlowControl(flowControl); - baseHeaders["Upstash-Failure-Callback-Flow-Control-Key"] = flowControlKey; - baseHeaders["Upstash-Failure-Callback-Flow-Control-Value"] = flowControlValue; - } - - if (!step?.callUrl) { - baseHeaders["Upstash-Failure-Callback"] = failureUrl; - } - } - - // if retries is set or if call url is passed, set a retry - // for call url, retry is 0 - if (step?.callUrl) { - baseHeaders["Upstash-Retries"] = callRetries?.toString() ?? "0"; - baseHeaders[WORKFLOW_FEATURE_HEADER] = "WF_NoDelete,InitialBody"; - - // if some retries is set, use it in callback and failure callback - if (retries !== undefined) { - baseHeaders["Upstash-Callback-Retries"] = retries.toString(); - baseHeaders["Upstash-Failure-Callback-Retries"] = retries.toString(); - } - - if (callFlowControl) { - const { flowControlKey, flowControlValue } = prepareFlowControl(callFlowControl); - - baseHeaders["Upstash-Flow-Control-Key"] = flowControlKey; - baseHeaders["Upstash-Flow-Control-Value"] = flowControlValue; - } - - if (flowControl) { - const { flowControlKey, flowControlValue } = prepareFlowControl(flowControl); - - baseHeaders["Upstash-Callback-Flow-Control-Key"] = flowControlKey; - baseHeaders["Upstash-Callback-Flow-Control-Value"] = flowControlValue; - } - } else { - if (flowControl) { - const { flowControlKey, flowControlValue } = prepareFlowControl(flowControl); - - baseHeaders["Upstash-Flow-Control-Key"] = flowControlKey; - baseHeaders["Upstash-Flow-Control-Value"] = flowControlValue; - } - - if (retries !== undefined) { - baseHeaders["Upstash-Retries"] = retries.toString(); - baseHeaders["Upstash-Failure-Callback-Retries"] = retries.toString(); - } - } - - if (userHeaders) { - for (const header of userHeaders.keys()) { - if (step?.callHeaders) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - baseHeaders[`Upstash-Callback-Forward-${header}`] = userHeaders.get(header)!; - } else { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - baseHeaders[`Upstash-Forward-${header}`] = userHeaders.get(header)!; - } - baseHeaders[`Upstash-Failure-Callback-Forward-${header}`] = userHeaders.get(header)!; - } - } - - if (step?.callHeaders) { - const forwardedHeaders = Object.fromEntries( - Object.entries(step.callHeaders).map(([header, value]) => [ - `Upstash-Forward-${header}`, - value, - ]) - ); - - return { - headers: { - ...baseHeaders, - ...forwardedHeaders, - "Upstash-Callback": workflowUrl, - "Upstash-Callback-Workflow-RunId": workflowRunId, - "Upstash-Callback-Workflow-CallType": "fromCallback", - "Upstash-Callback-Workflow-Init": "false", - "Upstash-Callback-Workflow-Url": workflowUrl, - "Upstash-Callback-Feature-Set": "LazyFetch,InitialBody", - - "Upstash-Callback-Forward-Upstash-Workflow-Callback": "true", - "Upstash-Callback-Forward-Upstash-Workflow-StepId": step.stepId.toString(), - "Upstash-Callback-Forward-Upstash-Workflow-StepName": step.stepName, - "Upstash-Callback-Forward-Upstash-Workflow-StepType": step.stepType, - "Upstash-Callback-Forward-Upstash-Workflow-Concurrent": step.concurrent.toString(), - "Upstash-Callback-Forward-Upstash-Workflow-ContentType": contentType, - [`Upstash-Callback-Forward-${WORKFLOW_INVOKE_COUNT_HEADER}`]: (invokeCount ?? 0).toString(), - "Upstash-Workflow-CallType": "toCallback", - }, - }; - } - - if (step?.waitEventId) { - return { - headers: { - ...baseHeaders, - "Upstash-Workflow-CallType": "step", - }, - timeoutHeaders: { - // to include user headers: - ...Object.fromEntries( - Object.entries(baseHeaders).map(([header, value]) => [header, [value]]) - ), - // to include telemetry headers: - ...(telemetry - ? Object.fromEntries( - Object.entries(getTelemetryHeaders(telemetry)).map(([header, value]) => [ - header, - [value], - ]) - ) - : {}), - // note: using WORKFLOW_ID_HEADER doesn't work, because Runid -> RunId: - "Upstash-Workflow-Runid": [workflowRunId], - [WORKFLOW_INIT_HEADER]: ["false"], - [WORKFLOW_URL_HEADER]: [workflowUrl], - "Upstash-Workflow-CallType": ["step"], - }, - }; - } - - return { headers: baseHeaders }; -}; - export const verifyRequest = async ( body: string, signature: string | null, @@ -618,22 +428,3 @@ export const verifyRequest = async ( ); } }; - -const prepareFlowControl = (flowControl: FlowControl) => { - const parallelism = flowControl.parallelism?.toString(); - const rate = flowControl.ratePerSecond?.toString(); - - const controlValue = [ - parallelism ? `parallelism=${parallelism}` : undefined, - rate ? `rate=${rate}` : undefined, - ].filter(Boolean); - - if (controlValue.length === 0) { - throw new QstashError("Provide at least one of parallelism or ratePerSecond for flowControl"); - } - - return { - flowControlKey: flowControl.key, - flowControlValue: controlValue.join(", "), - }; -};