diff --git a/.github/workflows/node.js.yml b/.github/workflows/node.js.yml index 1a86f01..26591bb 100644 --- a/.github/workflows/node.js.yml +++ b/.github/workflows/node.js.yml @@ -8,9 +8,9 @@ jobs: build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4 + - uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5 - name: Use Node.js - uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4 + uses: actions/setup-node@a0853c24544627f65ddf259abe73b1d18a591444 # v5 - run: npm i - run: npm run build - run: npx pkg-pr-new publish diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a9c558..efbcba4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,20 @@ # Changelog +## 0.2.8 alpha + +- Adds asynchronous events - wait for an event in a workflow, send + events asynchronously - allows pause/resume, human-in-loop, etc. +- Supports nested workflows with step.runWorkflow. +- Surfaces return value of the workflow in the status +- You can start a workflow directly from the CLI / dashboard without having to + make a mutation to call workflow.start: + - `{ fn: "path/to/file:workflowName", args: { ...your workflow args } }` +- Reduces read bandwidth when reading the journal after running many steps in parallel. +- Simplifies the onComplete type requirement so you can accept a workflowId as a string. + This helps when you have statically generated types which can't do branded strings. +- Adds a /test entrypoint to make testing easier +- Exports the `WorkflowCtx` and `WorkflowStep` types + ## 0.2.7 - Support for console logging & timing in workflows diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index ced4222..d1fd47d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -3,7 +3,7 @@ ## Running locally ```sh -npm run setup +npm i npm run dev ``` @@ -11,6 +11,7 @@ npm run dev ```sh npm run clean +npm run build npm run typecheck npm run lint npm run test diff --git a/README.md b/README.md index a804d52..f6b4074 100644 --- a/README.md +++ b/README.md @@ -1,16 +1,18 @@ -# Convex Workflow +# Convex Durable Workflows [![npm version](https://badge.fury.io/js/@convex-dev%2Fworkflow.svg?)](https://badge.fury.io/js/@convex-dev%2Fworkflow) +The Workflow component enables you + Have you ever wanted to run a series of functions reliably and durably, where each can have its own retry behavior, the overall workflow will survive server restarts, and you can have long-running workflows spanning months that can be canceled? Do you want to observe the status of a workflow reactively, as well as the results written from each step? -And do you want to do this with code, instead of a DSL? +And do you want to do this with code, instead of a static configuration? Welcome to the world of Convex workflows. @@ -32,23 +34,39 @@ import { components } from "./_generated/api"; export const workflow = new WorkflowManager(components.workflow); -export const exampleWorkflow = workflow.define({ +export const userOnboarding = workflow.define({ args: { - storageId: v.id("_storage"), + userId: v.id("users"), }, - handler: async (step, args): Promise => { - const transcription = await step.runAction( - internal.index.computeTranscription, + handler: async (ctx, args): Promise => { + const status = await ctx.runMutation( + internal.emails.sendVerificationEmail, { storageId: args.storageId }, ); - const embedding = await step.runAction( - internal.index.computeEmbedding, - { transcription }, - // Run this a month after the transcription is computed. - { runAfter: 30 * 24 * 60 * 60 * 1000 }, + if (status === "needsVerification") { + // Waits until verification is completed asynchronously. + await ctx.awaitEvent({ name: "verificationEmail" }); + } + const result = await ctx.runAction( + internal.llm.generateCustomContent, + { userId: args.userId }, + // Retry this on transient errors with the default retry policy. + { retry: true }, + ); + if (result.needsHumanInput) { + // Run a whole workflow as a single step. + await ctx.runWorkflow(internal.llm.refineContentWorkflow, { + userId: args.userId, + }); + } + + await ctx.runMutation( + internal.emails.sendFollowUpEmailMaybe, + { userId: args.userId }, + // Runs one day after the previous step. + { runAfter: 24 * 60 * 60 * 1000 }, ); - return embedding; }, }); ``` @@ -97,9 +115,9 @@ is designed to feel like a Convex action but with a few restrictions: 1. The workflow runs in the background, so it can't return a value. 2. The workflow must be _deterministic_, so it should implement most of its logic - by calling out to other Convex functions. We will be lifting some of these - restrictions over time by implementing `Math.random()`, `Date.now()`, and - `fetch` within our workflow environment. + by calling out to other Convex functions. We restrict access to some + non-deterministic functions like `Math.random()` and `fetch`. Others we + patch, such as `console` for logging and `Date` for time. Note: To help avoid type cycles, always annotate the return type of the `handler` with the return type of the workflow. @@ -107,7 +125,9 @@ with the return type of the workflow. ```ts export const exampleWorkflow = workflow.define({ args: { name: v.string() }, + returns: v.string(), handler: async (step, args): Promise => { + // ^ Specify the return type of the handler const queryResult = await step.runQuery( internal.example.exampleQuery, args, @@ -283,11 +303,13 @@ export const exampleWorkflow = workflow.define({ }); ``` -### Specifying how many workflows can run in parallel +### Specifying step parallelism You can specify how many steps can run in parallel by setting the `maxParallelism` workpool option. It has a reasonable default. -On the free tier, you should not exceed 20. +On the free tier, you should not exceed 20, otherwise your other scheduled +functions may become delayed while competing for available functions with your +workflow steps. On a Pro account, you should not exceed 100 across all your workflows and workpools. If you want to do a lot of work in parallel, you should employ batching, where each workflow operates on a batch of work, e.g. scraping a list of links instead diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index 6651140..02f0590 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -10,6 +10,8 @@ import type * as admin from "../admin.js"; import type * as example from "../example.js"; +import type * as nestedWorkflow from "../nestedWorkflow.js"; +import type * as passingSignals from "../passingSignals.js"; import type * as transcription from "../transcription.js"; import type * as userConfirmation from "../userConfirmation.js"; @@ -30,6 +32,8 @@ import type { declare const fullApi: ApiFromModules<{ admin: typeof admin; example: typeof example; + nestedWorkflow: typeof nestedWorkflow; + passingSignals: typeof passingSignals; transcription: typeof transcription; userConfirmation: typeof userConfirmation; }>; @@ -63,7 +67,7 @@ export declare const components: { | { kind: "success"; returnValue: any } | { error: string; kind: "failed" } | { kind: "canceled" }; - workflowId: string; + workflowId?: string; workpoolOptions?: { defaultRetryBehavior?: { base: number; @@ -105,6 +109,21 @@ export declare const components: { startedAt: number; workId?: string; } + | { + args: any; + argsSize: number; + completedAt?: number; + handle: string; + inProgress: boolean; + kind: "workflow"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workflowId?: string; + } | { args: { eventId?: string }; argsSize: number; @@ -118,7 +137,6 @@ export declare const components: { | { error: string; kind: "failed" } | { kind: "canceled" }; startedAt: number; - workId?: string; }; stepNumber: number; workflowId: string; @@ -170,6 +188,21 @@ export declare const components: { startedAt: number; workId?: string; } + | { + args: any; + argsSize: number; + completedAt?: number; + handle: string; + inProgress: boolean; + kind: "workflow"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workflowId?: string; + } | { args: { eventId?: string }; argsSize: number; @@ -183,7 +216,6 @@ export declare const components: { | { error: string; kind: "failed" } | { kind: "canceled" }; startedAt: number; - workId?: string; }; }>; workflowId: string; @@ -218,6 +250,21 @@ export declare const components: { startedAt: number; workId?: string; } + | { + args: any; + argsSize: number; + completedAt?: number; + handle: string; + inProgress: boolean; + kind: "workflow"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workflowId?: string; + } | { args: { eventId?: string }; argsSize: number; @@ -231,7 +278,6 @@ export declare const components: { | { error: string; kind: "failed" } | { kind: "canceled" }; startedAt: number; - workId?: string; }; stepNumber: number; workflowId: string; @@ -302,6 +348,21 @@ export declare const components: { startedAt: number; workId?: string; } + | { + args: any; + argsSize: number; + completedAt?: number; + handle: string; + inProgress: boolean; + kind: "workflow"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workflowId?: string; + } | { args: { eventId?: string }; argsSize: number; @@ -315,7 +376,6 @@ export declare const components: { | { error: string; kind: "failed" } | { kind: "canceled" }; startedAt: number; - workId?: string; }; stepNumber: number; workflowId: string; @@ -339,6 +399,45 @@ export declare const components: { }; } >; + listSteps: FunctionReference< + "query", + "internal", + { + order: "asc" | "desc"; + paginationOpts: { + cursor: string | null; + endCursor?: string | null; + id?: number; + maximumBytesRead?: number; + maximumRowsRead?: number; + numItems: number; + }; + workflowId: string; + }, + { + continueCursor: string; + isDone: boolean; + page: Array<{ + args: any; + completedAt?: number; + eventId?: string; + kind: "function" | "workflow" | "event"; + name: string; + nestedWorkflowId?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + stepId: string; + stepNumber: number; + workId?: string; + workflowId: string; + }>; + pageStatus?: "SplitRecommended" | "SplitRequired" | null; + splitCursor?: string | null; + } + >; }; }; }; diff --git a/example/convex/nestedWorkflow.ts b/example/convex/nestedWorkflow.ts new file mode 100644 index 0000000..ae4590f --- /dev/null +++ b/example/convex/nestedWorkflow.ts @@ -0,0 +1,37 @@ +import { v } from "convex/values"; +import { workflow } from "./example"; +import { internal } from "./_generated/api"; +import { internalMutation } from "./_generated/server"; + +export const parentWorkflow = workflow.define({ + args: { prompt: v.string() }, + handler: async (ctx, args) => { + console.log("Starting confirmation workflow"); + const length = await ctx.runWorkflow( + internal.nestedWorkflow.childWorkflow, + { foo: args.prompt }, + ); + console.log("Length:", length); + const stepResult = await ctx.runMutation(internal.nestedWorkflow.step, { + foo: args.prompt, + }); + console.log("Step result:", stepResult); + }, +}); + +export const childWorkflow = workflow.define({ + args: { foo: v.string() }, + returns: v.number(), + handler: async (_ctx, args) => { + console.log("Starting nested workflow"); + return args.foo.length; + }, +}); + +export const step = internalMutation({ + args: { foo: v.string() }, + handler: async (_ctx, args) => { + console.log("Starting step"); + return args.foo.length; + }, +}); diff --git a/example/convex/passingSignals.ts b/example/convex/passingSignals.ts new file mode 100644 index 0000000..c6b947b --- /dev/null +++ b/example/convex/passingSignals.ts @@ -0,0 +1,45 @@ +import { vWorkflowId, WorkflowManager } from "@convex-dev/workflow"; +import { components, internal } from "./_generated/api"; +import { internalMutation } from "./_generated/server"; +import { vEventId } from "../../src/types"; + +const workflow = new WorkflowManager(components.workflow); + +export const signalBasedWorkflow = workflow.define({ + args: {}, + handler: async (ctx) => { + console.log("Starting signal based workflow"); + for (let i = 0; i < 3; i++) { + const signalId = await ctx.runMutation( + internal.passingSignals.createSignal, + { workflowId: ctx.workflowId }, + ); + await ctx.awaitEvent({ id: signalId }); + console.log("Signal received", signalId); + } + console.log("All signals received"); + }, +}); + +export const createSignal = internalMutation({ + args: { workflowId: vWorkflowId }, + handler: async (ctx, args) => { + const eventId = await workflow.createEvent(ctx, { + name: "signal", + workflowId: args.workflowId, + }); + // You would normally store this eventId somewhere to be able to send the + // signal later. + await ctx.scheduler.runAfter(1000, internal.passingSignals.sendSignal, { + eventId, + }); + return eventId; + }, +}); + +export const sendSignal = internalMutation({ + args: { eventId: vEventId("signal") }, + handler: async (ctx, args) => { + await workflow.sendEvent(ctx, { id: args.eventId }); + }, +}); diff --git a/example/convex/userConfirmation.ts b/example/convex/userConfirmation.ts index 3ba95b9..fd8892b 100644 --- a/example/convex/userConfirmation.ts +++ b/example/convex/userConfirmation.ts @@ -1,49 +1,58 @@ -import { defineEvent, vWorkflowId } from "@convex-dev/workflow"; +import { + defineEvent, + vWorkflowId, + WorkflowManager, +} from "@convex-dev/workflow"; import { v } from "convex/values"; -import { internal } from "./_generated/api"; -import { internalAction, mutation } from "./_generated/server"; -import { workflow } from "./example"; +import { components, internal } from "./_generated/api"; +import { internalAction, internalMutation } from "./_generated/server"; -const approvalEvent = defineEvent({ - name: "approval", +export const approvalEvent = defineEvent({ + name: "approval" as const, validator: v.union( v.object({ approved: v.literal(true), choice: v.number() }), v.object({ approved: v.literal(false), reason: v.string() }), ), }); +const workflow = new WorkflowManager(components.workflow); + export const confirmationWorkflow = workflow.define({ args: { prompt: v.string() }, returns: v.string(), - handler: async (step, args): Promise => { - const proposals = await step.runAction( + handler: async (ctx, args): Promise => { + console.log("Starting confirmation workflow"); + const proposals = await ctx.runAction( internal.userConfirmation.generateProposals, { prompt: args.prompt }, { retry: true }, ); - const approval = await step.awaitEvent(approvalEvent); + console.log("Proposals generated", proposals); + const approval = await ctx.awaitEvent(approvalEvent); if (!approval.approved) { return "rejected: " + approval.reason; } const choice = proposals[approval.choice]; + console.log("Choice selected", choice); return choice; }, }); export const generateProposals = internalAction({ args: { prompt: v.string() }, - handler: async (ctx, args) => { + handler: async (_ctx, _args) => { // imagine this is a call to an LLM return ["proposal1", "proposal2", "proposal3"]; }, }); -export const chooseProposal = mutation({ +export const chooseProposal = internalMutation({ args: { workflowId: vWorkflowId, choice: v.number() }, handler: async (ctx, args) => { - await workflow.sendEvent(ctx, args.workflowId, approvalEvent, { - approved: true, - choice: args.choice, + await workflow.sendEvent(ctx, { + ...approvalEvent, + workflowId: args.workflowId, + value: { approved: true, choice: args.choice }, }); return true; }, diff --git a/package-lock.json b/package-lock.json index 46d17c3..7ac4079 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,32 +1,33 @@ { "name": "@convex-dev/workflow", - "version": "0.2.7", + "version": "0.2.8-alpha.9", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@convex-dev/workflow", - "version": "0.2.7", + "version": "0.2.8-alpha.9", "license": "Apache-2.0", "dependencies": { "async-channel": "^0.2.0" }, "devDependencies": { - "@convex-dev/workpool": "^0.2.19-alpha.2", + "@convex-dev/workpool": "0.2.19", "@edge-runtime/vm": "5.0.0", "@eslint/eslintrc": "3.3.1", - "@eslint/js": "9.37.0", - "@types/node": "22.18.8", + "@eslint/js": "9.38.0", + "@types/node": "22.18.12", "@typescript-eslint/eslint-plugin": "8.40.0", "@typescript-eslint/parser": "8.40.0", "chokidar-cli": "3.0.0", - "convex": "^1.27.3", + "convex": "1.28.0", + "convex-helpers": "0.1.102", "convex-test": "0.0.38", "cpy-cli": "6.0.0", - "eslint": "9.37.0", + "eslint": "9.38.0", "globals": "16.4.0", "npm-run-all2": "8.0.4", - "openai": "6.2.0", + "openai": "6.6.0", "pkg-pr-new": "0.0.60", "prettier": "3.6.2", "typescript": "5.9.3", @@ -34,7 +35,7 @@ "vitest": "3.2.4" }, "peerDependencies": { - "@convex-dev/workpool": "^0.2.18", + "@convex-dev/workpool": "^0.2.19", "convex": ">=1.25.0 <1.35.0", "convex-helpers": "^0.1.99" } @@ -92,9 +93,9 @@ "license": "MIT" }, "node_modules/@convex-dev/workpool": { - "version": "0.2.19-alpha.2", - "resolved": "https://registry.npmjs.org/@convex-dev/workpool/-/workpool-0.2.19-alpha.2.tgz", - "integrity": "sha512-OrrU8x69SKTr3XbRRg4RUuOnRcD5Al3nii66OA48ZNYRJrswJ5SzZ9h9vKNYH7L66M3nqXCm9BEfiXbQMdvYUQ==", + "version": "0.2.19", + "resolved": "https://registry.npmjs.org/@convex-dev/workpool/-/workpool-0.2.19.tgz", + "integrity": "sha512-U2KwYnsKILyxW1baWEhDv+ZtnL5FZbYFxBT5owQ0Lw/kseiudMZraA4clH+/6gowHSahWpkq4wndhcOfpfhuOA==", "dev": true, "license": "Apache-2.0", "peerDependencies": { @@ -118,6 +119,7 @@ "integrity": "sha512-NKBGBSIKUG584qrS1tyxVpX/AKJKQw5HgjYEnPLC0QsTw79JrGn+qUr8CXFb955Iy7GUdiiUv1rJ6JBGvaKb6w==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@edge-runtime/primitives": "6.0.0" }, @@ -132,6 +134,7 @@ "cpu": [ "ppc64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -148,6 +151,7 @@ "cpu": [ "arm" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -164,6 +168,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -180,6 +185,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -196,6 +202,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -212,6 +219,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -228,6 +236,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -244,6 +253,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -260,6 +270,7 @@ "cpu": [ "arm" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -276,6 +287,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -292,6 +304,7 @@ "cpu": [ "ia32" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -308,6 +321,7 @@ "cpu": [ "loong64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -324,6 +338,7 @@ "cpu": [ "mips64el" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -340,6 +355,7 @@ "cpu": [ "ppc64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -356,6 +372,7 @@ "cpu": [ "riscv64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -372,6 +389,7 @@ "cpu": [ "s390x" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -388,6 +406,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -404,6 +423,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -420,6 +440,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -436,6 +457,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -452,6 +474,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -468,6 +491,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -484,6 +508,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -500,6 +525,7 @@ "cpu": [ "ia32" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -516,6 +542,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -566,13 +593,13 @@ } }, "node_modules/@eslint/config-array": { - "version": "0.21.0", - "resolved": "https://registry.npmjs.org/@eslint/config-array/-/config-array-0.21.0.tgz", - "integrity": "sha512-ENIdc4iLu0d93HeYirvKmrzshzofPw6VkZRKQGe9Nv46ZnWUzcF1xV01dcvEg/1wXUR61OmmlSfyeyO7EvjLxQ==", + "version": "0.21.1", + "resolved": "https://registry.npmjs.org/@eslint/config-array/-/config-array-0.21.1.tgz", + "integrity": "sha512-aw1gNayWpdI/jSYVgzN5pL0cfzU02GT3NBpeT/DXbx1/1x7ZKxFPd9bwrzygx/qiwIQiJ1sw/zD8qY/kRvlGHA==", "dev": true, "license": "Apache-2.0", "dependencies": { - "@eslint/object-schema": "^2.1.6", + "@eslint/object-schema": "^2.1.7", "debug": "^4.3.1", "minimatch": "^3.1.2" }, @@ -581,9 +608,9 @@ } }, "node_modules/@eslint/config-helpers": { - "version": "0.4.0", - "resolved": "https://registry.npmjs.org/@eslint/config-helpers/-/config-helpers-0.4.0.tgz", - "integrity": "sha512-WUFvV4WoIwW8Bv0KeKCIIEgdSiFOsulyN0xrMu+7z43q/hkOLXjvb5u7UC9jDxvRzcrbEmuZBX5yJZz1741jog==", + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/@eslint/config-helpers/-/config-helpers-0.4.1.tgz", + "integrity": "sha512-csZAzkNhsgwb0I/UAV6/RGFTbiakPCf0ZrGmrIxQpYvGZ00PhTkSnyKNolphgIvmnJeGw6rcGVEXfTzUnFuEvw==", "dev": true, "license": "Apache-2.0", "dependencies": { @@ -644,9 +671,9 @@ } }, "node_modules/@eslint/js": { - "version": "9.37.0", - "resolved": "https://registry.npmjs.org/@eslint/js/-/js-9.37.0.tgz", - "integrity": "sha512-jaS+NJ+hximswBG6pjNX0uEJZkrT0zwpVi3BA3vX22aFGjJjmgSTSmPpZCRKmoBL5VY/M6p0xsSJx7rk7sy5gg==", + "version": "9.38.0", + "resolved": "https://registry.npmjs.org/@eslint/js/-/js-9.38.0.tgz", + "integrity": "sha512-UZ1VpFvXf9J06YG9xQBdnzU+kthors6KjhMAl6f4gH4usHyh31rUf2DLGInT8RFYIReYXNSydgPY0V2LuWgl7A==", "dev": true, "license": "MIT", "engines": { @@ -657,9 +684,9 @@ } }, "node_modules/@eslint/object-schema": { - "version": "2.1.6", - "resolved": "https://registry.npmjs.org/@eslint/object-schema/-/object-schema-2.1.6.tgz", - "integrity": "sha512-RBMg5FRL0I0gs51M/guSAj5/e14VQ4tpZnQNWwuDT66P14I43ItmPfIZRhO9fUVIPOAQXU47atlywZ/czoqFPA==", + "version": "2.1.7", + "resolved": "https://registry.npmjs.org/@eslint/object-schema/-/object-schema-2.1.7.tgz", + "integrity": "sha512-VtAOaymWVfZcmZbp6E2mympDIHvyjXs/12LqWYjVw6qjrfF+VK+fyG33kChz3nnK+SU5/NeHOqrTEHS8sXO3OA==", "dev": true, "license": "Apache-2.0", "engines": { @@ -878,6 +905,7 @@ "integrity": "sha512-dKYCMuPO1bmrpuogcjQ8z7ICCH3FP6WmxpwC03yjzGfZhj9fTJg6+bS1+UAplekbN2C+M61UNllGOOoAfGCrdQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@octokit/auth-token": "^4.0.0", "@octokit/graphql": "^7.1.0", @@ -1410,11 +1438,12 @@ "license": "MIT" }, "node_modules/@types/node": { - "version": "22.18.8", - "resolved": "https://registry.npmjs.org/@types/node/-/node-22.18.8.tgz", - "integrity": "sha512-pAZSHMiagDR7cARo/cch1f3rXy0AEXwsVsVH09FcyeJVAzCnGgmYis7P3JidtTUjyadhTeSo8TgRPswstghDaw==", + "version": "22.18.12", + "resolved": "https://registry.npmjs.org/@types/node/-/node-22.18.12.tgz", + "integrity": "sha512-BICHQ67iqxQGFSzfCFTT7MRQ5XcBjG5aeKh5Ok38UBbPe5fxTyE+aHFxwVrGyr8GNlqFMLKD1D3P2K/1ks8tog==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "undici-types": "~6.21.0" } @@ -1465,6 +1494,7 @@ "integrity": "sha512-jCNyAuXx8dr5KJMkecGmZ8KI61KBUhkCob+SD+C+I5+Y1FWI2Y3QmY4/cxMCC5WAsZqoEtEETVhUiUMIGCf6Bw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.40.0", "@typescript-eslint/types": "8.40.0", @@ -1798,6 +1828,7 @@ "integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==", "dev": true, "license": "MIT", + "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -2131,10 +2162,11 @@ "license": "MIT" }, "node_modules/convex": { - "version": "1.27.4", - "resolved": "https://registry.npmjs.org/convex/-/convex-1.27.4.tgz", - "integrity": "sha512-aPP3uxOF5v+K4uftXxRh8GAYepsjsFgU+S9IpAyLVNaFU3Z72WB1rIhaSzPAo4Q0TJWsOKANFGU903IU92QDTA==", + "version": "1.28.0", + "resolved": "https://registry.npmjs.org/convex/-/convex-1.28.0.tgz", + "integrity": "sha512-40FgeJ/LxP9TxnkDDztU/A5gcGTdq1klcTT5mM0Ak+kSlQiDktMpjNX1TfkWLxXaE3lI4qvawKH95v2RiYgFxA==", "license": "Apache-2.0", + "peer": true, "dependencies": { "esbuild": "0.25.4", "prettier": "^3.0.0" @@ -2164,21 +2196,21 @@ } }, "node_modules/convex-helpers": { - "version": "0.1.99", - "resolved": "https://registry.npmjs.org/convex-helpers/-/convex-helpers-0.1.99.tgz", - "integrity": "sha512-W4sV9676vWWIwfYvG76Dxf7biDgpYggvwTLW5fJgLhXIb/XUCacO2AOXu+HrW85GvPRb1LLjhWgWPH8byHiTsw==", + "version": "0.1.102", + "resolved": "https://registry.npmjs.org/convex-helpers/-/convex-helpers-0.1.102.tgz", + "integrity": "sha512-FISEUHjTKZFk4GE2jZZt6AvTFZCrzoSW6aXJUTY5IIlfuFvJCry3i6YMvOC3lL6ivR75lVEsGt56rwGP2kCcNQ==", + "dev": true, "license": "Apache-2.0", - "peer": true, "bin": { "convex-helpers": "bin.cjs" }, "peerDependencies": { "@standard-schema/spec": "^1.0.0", - "convex": "^1.13.0", + "convex": "^1.24.0", "hono": "^4.0.5", "react": "^17.0.2 || ^18.0.0 || ^19.0.0", "typescript": "^5.5", - "zod": "^3.22.4" + "zod": "^3.22.4 || ^4.0.15" }, "peerDependenciesMeta": { "@standard-schema/spec": { @@ -2359,6 +2391,7 @@ "version": "0.25.4", "resolved": "https://registry.npmjs.org/esbuild/-/esbuild-0.25.4.tgz", "integrity": "sha512-8pgjLUcUjcgDg+2Q4NYXnPbo/vncAY4UmyaCm0jZevERqCHZIaWwdJHkf8XQtu4AxSKCdvrUbT0XUr1IdZzI8Q==", + "dev": true, "hasInstallScript": true, "license": "MIT", "bin": { @@ -2408,25 +2441,25 @@ } }, "node_modules/eslint": { - "version": "9.37.0", - "resolved": "https://registry.npmjs.org/eslint/-/eslint-9.37.0.tgz", - "integrity": "sha512-XyLmROnACWqSxiGYArdef1fItQd47weqB7iwtfr9JHwRrqIXZdcFMvvEcL9xHCmL0SNsOvF0c42lWyM1U5dgig==", + "version": "9.38.0", + "resolved": "https://registry.npmjs.org/eslint/-/eslint-9.38.0.tgz", + "integrity": "sha512-t5aPOpmtJcZcz5UJyY2GbvpDlsK5E8JqRqoKtfiKE3cNh437KIqfJr3A3AKf5k64NPx6d0G3dno6XDY05PqPtw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.1", - "@eslint/config-array": "^0.21.0", - "@eslint/config-helpers": "^0.4.0", + "@eslint/config-array": "^0.21.1", + "@eslint/config-helpers": "^0.4.1", "@eslint/core": "^0.16.0", "@eslint/eslintrc": "^3.3.1", - "@eslint/js": "9.37.0", + "@eslint/js": "9.38.0", "@eslint/plugin-kit": "^0.4.0", "@humanfs/node": "^0.16.6", "@humanwhocodes/module-importer": "^1.0.1", "@humanwhocodes/retry": "^0.4.2", "@types/estree": "^1.0.6", - "@types/json-schema": "^7.0.15", "ajv": "^6.12.4", "chalk": "^4.0.0", "cross-spawn": "^7.0.6", @@ -3261,9 +3294,9 @@ } }, "node_modules/openai": { - "version": "6.2.0", - "resolved": "https://registry.npmjs.org/openai/-/openai-6.2.0.tgz", - "integrity": "sha512-qqjzHls7F5xkXNGy9P1Ei1rorI5LWupUUFWP66zPU8FlZbiITX8SFcHMKNZg/NATJ0LpIZcMUFxSwQmdeQPwSw==", + "version": "6.6.0", + "resolved": "https://registry.npmjs.org/openai/-/openai-6.6.0.tgz", + "integrity": "sha512-1yWk4cBsHF5Bq9TreHYOHY7pbqdlT74COnm8vPx7WKn36StS+Hyk8DdAitnLaw67a5Cudkz5EmlFQjSrNnrA2w==", "dev": true, "license": "Apache-2.0", "bin": { @@ -3563,6 +3596,7 @@ "version": "3.6.2", "resolved": "https://registry.npmjs.org/prettier/-/prettier-3.6.2.tgz", "integrity": "sha512-I7AIg5boAr5R0FFtJ6rCfD+LFsWHp81dolrFD8S79U9tb8Az2nGrJncnMSnys+bpQJfRUzqs9hnA81OAA3hCuQ==", + "dev": true, "license": "MIT", "bin": { "prettier": "bin/prettier.cjs" @@ -4022,6 +4056,7 @@ "integrity": "sha512-M7BAV6Rlcy5u+m6oPhAPFgJTzAioX/6B0DxyvDlo9l8+T3nLKbrczg2WLUyzd45L8RqfUMyGPzekbMvX2Ldkwg==", "dev": true, "license": "MIT", + "peer": true, "engines": { "node": ">=12" }, @@ -4120,8 +4155,9 @@ "version": "5.9.3", "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz", "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", - "devOptional": true, + "dev": true, "license": "Apache-2.0", + "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -4233,6 +4269,7 @@ "integrity": "sha512-cZn6NDFE7wdTpINgs++ZJ4N49W2vRp8LCKrn3Ob1kYNtOo21vfDoaV5GzBfLU4MovSAB8uNRm4jgzVQZ+mBzPQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "esbuild": "^0.25.0", "fdir": "^6.4.4", @@ -4346,6 +4383,7 @@ "integrity": "sha512-M7BAV6Rlcy5u+m6oPhAPFgJTzAioX/6B0DxyvDlo9l8+T3nLKbrczg2WLUyzd45L8RqfUMyGPzekbMvX2Ldkwg==", "dev": true, "license": "MIT", + "peer": true, "engines": { "node": ">=12" }, @@ -4679,8 +4717,9 @@ "version": "3.25.76", "resolved": "https://registry.npmjs.org/zod/-/zod-3.25.76.tgz", "integrity": "sha512-gzUt/qt81nXsFGKIFcC3YnfEAx5NkunCfnDlvuBSSFS02bcXu4Lmea0AFIUwbLWxWPx3d9p8S5QoaujKcNQxcQ==", - "devOptional": true, + "dev": true, "license": "MIT", + "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } diff --git a/package.json b/package.json index f1475b1..6c5dea1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@convex-dev/workflow", - "version": "0.2.7", + "version": "0.2.8-alpha.9", "description": "Convex component for durably executing workflows.", "keywords": [ "convex", @@ -13,24 +13,25 @@ "license": "Apache-2.0", "type": "module", "scripts": { - "example": "convex dev --typecheck-components --live-component-sources", - "dev": "run-p -r 'example' 'build:watch'", - "dashboard": "cd example && npx convex dashboard", - "all": "run-p -r 'example' 'build:watch' 'test:watch'", - "setup": "npm i && npm run build && npx convex dev --once", - "build:watch": "cd src && npx chokidar -d 1000 '../tsconfig.json' '**/*.ts' -c 'npm run build' --initial", - "build": "tsc --project ./tsconfig.build.json && npm run copy:dts && echo '{\\n \"type\": \"module\"\\n}' > dist/package.json", + "dev": "run-p -r 'dev:backend' 'dev:frontend' 'build:watch'", + "dev:backend": "convex dev --live-component-sources --typecheck-components", + "dev:frontend": "cd example && vite --clearScreen false", + "predev": "npm run dev:backend -- --until-success", + "clean": "rm -rf dist tsconfig.build.tsbuildinfo", + "build": "tsc --project ./tsconfig.build.json && npm run copy:dts", "copy:dts": "rsync -a --include='*/' --include='*.d.ts' --exclude='*' src/ dist/ || cpy 'src/**/*.d.ts' 'dist/' --parents", + "build:watch": "npx chokidar 'tsconfig*.json' 'src/**/*.ts' -i '**/*.test.ts' -c 'npm run build' --initial", "typecheck": "tsc --noEmit && tsc -p example/convex", - "clean": "rm -rf dist tsconfig.build.tsbuildinfo", - "alpha": "npm run clean && npm run build && run-p test lint typecheck && npm version prerelease --preid alpha && npm publish --tag alpha && git push --tags", - "release": "npm run clean && npm run build && run-p test lint typecheck && npm version patch && npm publish && git push --tags && git push", + "lint": "eslint src && eslint example/convex", + "all": "run-p -r 'dev:backend' 'dev:frontend' 'build:watch' 'test:watch'", "test": "vitest run --typecheck", - "test:watch": "vitest --typecheck", + "test:watch": "vitest --typecheck --clearScreen false", "test:debug": "vitest --inspect-brk --no-file-parallelism", "test:coverage": "vitest run --coverage --coverage.reporter=text", - "lint": "eslint src && eslint example/convex", - "version": "pbcopy <<<$npm_package_version; vim CHANGELOG.md && git add CHANGELOG.md" + "prepare": "npm run build", + "alpha": "npm run clean && npm ci && run-p test lint typecheck && npm version prerelease --preid alpha && npm publish --tag alpha && git push --tags", + "release": "npm run clean && npm ci && run-p test lint typecheck && npm version patch && npm publish && git push --tags", + "version": "pbcopy <<<$npm_package_version; vim CHANGELOG.md && prettier -w CHANGELOG.md && git add CHANGELOG.md" }, "files": [ "dist", @@ -51,7 +52,7 @@ } }, "peerDependencies": { - "@convex-dev/workpool": "^0.2.18", + "@convex-dev/workpool": "^0.2.19", "convex": ">=1.25.0 <1.35.0", "convex-helpers": "^0.1.99" }, @@ -59,21 +60,22 @@ "async-channel": "^0.2.0" }, "devDependencies": { - "@convex-dev/workpool": "^0.2.19-alpha.2", + "@convex-dev/workpool": "0.2.19", "@edge-runtime/vm": "5.0.0", "@eslint/eslintrc": "3.3.1", - "@eslint/js": "9.37.0", - "@types/node": "22.18.8", + "@eslint/js": "9.38.0", + "@types/node": "22.18.12", "@typescript-eslint/eslint-plugin": "8.40.0", "@typescript-eslint/parser": "8.40.0", "chokidar-cli": "3.0.0", - "convex": "^1.27.3", + "convex": "1.28.0", + "convex-helpers": "0.1.102", "convex-test": "0.0.38", "cpy-cli": "6.0.0", - "eslint": "9.37.0", + "eslint": "9.38.0", "globals": "16.4.0", "npm-run-all2": "8.0.4", - "openai": "6.2.0", + "openai": "6.6.0", "pkg-pr-new": "0.0.60", "prettier": "3.6.2", "typescript": "5.9.3", diff --git a/renovate.json b/renovate.json index 8e3387d..7abbc31 100644 --- a/renovate.json +++ b/renovate.json @@ -1,14 +1,23 @@ { "$schema": "https://docs.renovatebot.com/renovate-schema.json", + "extends": ["config:best-practices"], + "schedule": ["* 0-4 * * 1"], + "timezone": "America/Los_Angeles", + "prConcurrentLimit": 1, "packageRules": [ { + "groupName": "Routine updates", "matchUpdateTypes": ["minor", "patch", "pin", "digest"], "automerge": true }, + { + "groupName": "Major updates", + "matchUpdateTypes": ["major"], + "automerge": false + }, { "matchDepTypes": ["devDependencies"], "automerge": true } - ], - "extends": ["config:best-practices"] + ] } diff --git a/src/client/events.ts b/src/client/events.ts deleted file mode 100644 index bed12bf..0000000 --- a/src/client/events.ts +++ /dev/null @@ -1,35 +0,0 @@ -import type { EventId, EventSpec, VEventId } from "../types.js"; -import { v, type Infer, type Validator } from "convex/values"; - -/** - * Define a named event with a validator. - * @param spec - The event spec. - * @returns Utility functions to specify type-safe events and results. - */ -export function defineEvent< - Name extends string, - V extends Validator, ->(spec: { - name: Name; - validator?: V; -}): EventSpec> & { - /** - * A validator for the named event ID. - */ - vEventId: VEventId; - /** - * Use this to provide an ID to `awaitEvent` or `sendEvent`. - */ - withId: (id: EventId) => EventSpec>; -} { - return { - ...spec, - withId: (id: EventId) => ({ ...spec, id }), - vEventId: v.string() as VEventId, - }; -} - -export type TypedRunResult = - | { kind: "success"; returnValue: T } - | { kind: "failed"; error: string } - | { kind: "canceled" }; diff --git a/src/client/index.ts b/src/client/index.ts index 9aac212..1a69e2a 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -1,7 +1,9 @@ import type { + RunResult, WorkpoolOptions, WorkpoolRetryOptions, } from "@convex-dev/workpool"; +import { parse } from "convex-helpers/validators"; import { createFunctionHandle, type FunctionArgs, @@ -10,25 +12,36 @@ import { type GenericDataModel, type GenericMutationCtx, type GenericQueryCtx, + type PaginationOptions, + type PaginationResult, type RegisteredMutation, type ReturnValueForOptionalValidator, } from "convex/server"; -import type { ObjectType, PropertyValidators, Validator } from "convex/values"; +import type { + Infer, + ObjectType, + PropertyValidators, + Validator, +} from "convex/values"; import type { Step } from "../component/schema.js"; import type { EventId, - EventSpec, OnCompleteArgs, WorkflowId, + WorkflowStep, } from "../types.js"; import { safeFunctionName } from "./safeFunctionName.js"; -import type { OpaqueIds, WorkflowComponent, WorkflowStep } from "./types.js"; +import type { OpaqueIds, WorkflowComponent } from "./types.js"; +import type { WorkflowCtx } from "./workflowContext.js"; import { workflowMutation } from "./workflowMutation.js"; -import { parse } from "convex-helpers/validators"; -export { vWorkflowId, type WorkflowId } from "../types.js"; -export type { RunOptions } from "./types.js"; -export { defineEvent } from "./events.js"; +export { + vWorkflowId, + vWorkflowStep, + type WorkflowId, + type WorkflowStep, +} from "../types.js"; +export type { RunOptions, WorkflowCtx } from "./workflowContext.js"; export type CallbackOptions = { /** @@ -67,7 +80,7 @@ export type WorkflowDefinition< > = { args?: ArgsValidator; handler: ( - step: WorkflowStep, + step: WorkflowCtx, args: ObjectType, ) => Promise>; returns?: ReturnsValidator; @@ -76,7 +89,7 @@ export type WorkflowDefinition< export type WorkflowStatus = | { type: "inProgress"; running: OpaqueIds[] } - | { type: "completed" } + | { type: "completed"; result: unknown } | { type: "canceled" } | { type: "failed"; error: string }; @@ -105,7 +118,9 @@ export class WorkflowManager { fn: "You should not call this directly, call workflow.start instead"; args: ObjectType; }, - void + ReturnsValidator extends Validator + ? Infer + : void > { return workflowMutation( this.component, @@ -185,7 +200,7 @@ export class WorkflowManager { case "failed": return { type: "failed", error: workflow.runResult.error }; case "success": - return { type: "completed" }; + return { type: "completed", result: workflow.runResult.returnValue }; } } @@ -201,6 +216,36 @@ export class WorkflowManager { }); } + /** + * List the steps in a workflow, including their name, args, return value etc. + * + * @param ctx - The Convex context from a query, mutation, or action. + * @param workflowId - The workflow ID. + * @param opts - How many steps to fetch and in what order. + * e.g. `{ order: "desc", paginationOpts: { cursor: null, numItems: 10 } }` + * will get the last 10 steps in descending order. + * Defaults to 100 steps in ascending order. + * @returns The pagination result with per-step data. + */ + async listSteps( + ctx: RunQueryCtx, + workflowId: WorkflowId, + opts?: { + order?: "asc" | "desc"; + paginationOpts?: PaginationOptions; + }, + ): Promise> { + const steps = await ctx.runQuery(this.component.workflow.listSteps, { + workflowId, + order: opts?.order ?? "asc", + paginationOpts: opts?.paginationOpts ?? { + cursor: null, + numItems: 100, + }, + }); + return steps as PaginationResult; + } + /** * Clean up a completed workflow's storage. * @@ -217,46 +262,93 @@ export class WorkflowManager { /** * Send an event to a workflow. * - * @param ctx - Either ctx from a mutation/action or a workflow step. - * @param args - The event arguments. + * @param ctx - From a mutation, action or workflow step. + * @param args - Either send an event by its ID, or by name and workflow ID. + * If you have a validator, you must provide a value. + * If you provide an error string, awaiting the event will throw an error. */ async sendEvent( ctx: RunMutationCtx, - workflowId: WorkflowId, - args: EventSpec, - ...runResult: T extends null ? [] : [T] + args: ( + | { workflowId: WorkflowId; name: Name; id?: EventId } + | { workflowId?: undefined; name?: Name; id: EventId } + ) & + ( + | { validator?: undefined; value?: T } + | { validator: Validator; value: T } + | { error: string; value?: undefined } + ), ): Promise> { - let result = { - kind: "success" as const, - returnValue: runResult[0] ?? (null as T), - }; - if (args.validator && result.kind === "success") { - result = { - ...result, - returnValue: parse(args.validator, result.returnValue), - }; - } + let result: RunResult = + "error" in args + ? { + kind: "failed", + error: args.error, + } + : { + kind: "success" as const, + returnValue: args.validator + ? parse(args.validator, args.value) + : "value" in args + ? args.value + : null, + }; return (await ctx.runMutation(this.component.event.send, { eventId: args.id, result, name: args.name, - workflowId: workflowId, + workflowId: args.workflowId, workpoolOptions: this.options?.workpoolOptions, })) as EventId; } + /** + * Create an event ahead of time, enabling awaiting a specific event by ID. + * @param ctx - From an action, mutation or workflow step. + * @param args - The name of the event and what workflow it belongs to. + * @returns The event ID, which can be used to send the event or await it. + */ async createEvent( ctx: RunMutationCtx, - component: WorkflowComponent, args: { name: Name; workflowId: WorkflowId }, ): Promise> { - return (await ctx.runMutation(component.event.create, { + return (await ctx.runMutation(this.component.event.create, { name: args.name, workflowId: args.workflowId, })) as EventId; } } +/** + * Define an event specification: a name and a validator. + * This helps share definitions between workflow.sendEvent and ctx.awaitEvent. + * e.g. + * ```ts + * const approvalEvent = defineEvent({ + * name: "approval", + * validator: v.object({ approved: v.boolean() }), + * }); + * ``` + * Then you can await it in a workflow: + * ```ts + * const result = await ctx.awaitEvent(approvalEvent); + * ``` + * And send from somewhere else: + * ```ts + * await workflow.sendEvent(ctx, { + * ...approvalEvent, + * workflowId, + * value: { approved: true }, + * }); + * ``` + */ +export function defineEvent< + Name extends string, + V extends Validator, +>(spec: { name: Name; validator: V }) { + return spec; +} + type RunQueryCtx = { runQuery: GenericQueryCtx["runQuery"]; }; diff --git a/src/client/step.ts b/src/client/step.ts index cc0d150..9fa2a7e 100644 --- a/src/client/step.ts +++ b/src/client/step.ts @@ -17,9 +17,9 @@ import { journalEntrySize, valueSize, } from "../component/schema.js"; -import type { SchedulerOptions, WorkflowComponent } from "./types.js"; +import type { WorkflowComponent } from "./types.js"; import { MAX_JOURNAL_SIZE } from "../shared.js"; -import type { EventId } from "../types.js"; +import type { EventId, SchedulerOptions } from "../types.js"; export type WorkerResult = | { type: "handlerDone"; runResult: RunResult } @@ -36,7 +36,12 @@ export type StepRequest = { } | { kind: "event"; - args: { eventId?: EventId }; + args: { eventId?: EventId }; + } + | { + kind: "workflow"; + function: FunctionReference<"mutation", "internal">; + args: unknown; }; retry: RetryBehavior | boolean | undefined; schedulerOptions: SchedulerOptions; @@ -163,15 +168,22 @@ export class StepExecutor { target.kind === "function" ? { kind: "function" as const, - ...commonFields, functionType: target.functionType, handle: await createFunctionHandle(target.function), - } - : { - kind: "event" as const, ...commonFields, - args: target.args, - }; + } + : target.kind === "workflow" + ? { + kind: "workflow" as const, + handle: await createFunctionHandle(target.function), + ...commonFields, + } + : { + kind: "event" as const, + eventId: target.args.eventId, + ...commonFields, + args: target.args, + }; return { retry: message.retry, schedulerOptions: message.schedulerOptions, diff --git a/src/client/stepContext.ts b/src/client/stepContext.ts deleted file mode 100644 index f155fbe..0000000 --- a/src/client/stepContext.ts +++ /dev/null @@ -1,99 +0,0 @@ -import { BaseChannel } from "async-channel"; -import type { - FunctionReference, - FunctionArgs, - FunctionReturnType, - FunctionType, -} from "convex/server"; -import { safeFunctionName } from "./safeFunctionName.js"; -import type { StepRequest } from "./step.js"; -import type { RetryOption } from "@convex-dev/workpool"; -import type { RunOptions, WorkflowStep } from "./types.js"; -import type { EventSpec, WorkflowId } from "../types.js"; -import { parse } from "convex-helpers/validators"; - -export class StepContext implements WorkflowStep { - constructor( - public workflowId: WorkflowId, - private sender: BaseChannel, - ) {} - - async runQuery>( - query: Query, - args: FunctionArgs, - opts?: RunOptions, - ): Promise> { - return this.runFunction("query", query, args, opts); - } - - async runMutation>( - mutation: Mutation, - args: FunctionArgs, - opts?: RunOptions, - ): Promise> { - return this.runFunction("mutation", mutation, args, opts); - } - - async runAction>( - action: Action, - args: FunctionArgs, - opts?: RunOptions & RetryOption, - ): Promise> { - return this.runFunction("action", action, args, opts); - } - - async awaitEvent( - event: EventSpec, - ): Promise { - const result = await this.run({ - name: event.name, - target: { - kind: "event", - args: { eventId: event.id }, - }, - retry: undefined, - schedulerOptions: {}, - }); - if (event.validator) { - return parse(event.validator, result); - } - return result as T; - } - - private async runFunction< - F extends FunctionReference, - >( - functionType: FunctionType, - f: F, - args: unknown, - opts?: RunOptions & RetryOption, - ): Promise { - const { name, retry, ...schedulerOptions } = opts ?? {}; - return this.run({ - name: name ?? safeFunctionName(f), - target: { - kind: "function", - functionType, - function: f, - args, - }, - retry, - schedulerOptions, - }); - } - - private async run( - request: Omit, - ): Promise { - let send: unknown; - const p = new Promise((resolve, reject) => { - send = this.sender.push({ - ...request, - resolve, - reject, - }); - }); - await send; - return p; - } -} diff --git a/src/client/types.ts b/src/client/types.ts index 1632ff8..f0f4279 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -1,103 +1,9 @@ -import type { RetryOption, WorkId } from "@convex-dev/workpool"; -import type { - Expand, - FunctionArgs, - FunctionReference, - FunctionReturnType, -} from "convex/server"; +import type { Expand, FunctionReference } from "convex/server"; import type { api } from "../component/_generated/api.js"; import type { GenericId } from "convex/values"; -import type { EventSpec, WorkflowId } from "../types.js"; export type WorkflowComponent = UseApi; -export type RunOptions = { - /** - * The name of the function. By default, if you pass in api.foo.bar.baz, - * it will use "foo/bar:baz" as the name. If you pass in a function handle, - * it will use the function handle directly. - */ - name?: string; -} & SchedulerOptions; - -export type SchedulerOptions = - | { - /** - * The time (ms since epoch) to run the action at. - * If not provided, the action will be run as soon as possible. - * Note: this is advisory only. It may run later. - */ - runAt?: number; - } - | { - /** - * The number of milliseconds to run the action after. - * If not provided, the action will be run as soon as possible. - * Note: this is advisory only. It may run later. - */ - runAfter?: number; - }; - -export type WorkflowStep = { - /** - * The ID of the workflow currently running. - */ - workflowId: WorkflowId; - /** - * Run a query with the given name and arguments. - * - * @param query - The query to run, like `internal.index.exampleQuery`. - * @param args - The arguments to the query function. - * @param opts - Options for scheduling and naming the query. - */ - runQuery>( - query: Query, - args: FunctionArgs, - opts?: RunOptions, - ): Promise>; - - /** - * Run a mutation with the given name and arguments. - * - * @param mutation - The mutation to run, like `internal.index.exampleMutation`. - * @param args - The arguments to the mutation function. - * @param opts - Options for scheduling and naming the mutation. - */ - runMutation>( - mutation: Mutation, - args: FunctionArgs, - opts?: RunOptions, - ): Promise>; - - /** - * Run an action with the given name and arguments. - * - * @param action - The action to run, like `internal.index.exampleAction`. - * @param args - The arguments to the action function. - * @param opts - Options for retrying, scheduling and naming the action. - */ - runAction>( - action: Action, - args: FunctionArgs, - opts?: RunOptions & RetryOption, - ): Promise>; - - /** - * Blocks until a matching event is sent to this workflow. - * - * If an ID is specified, an event with that ID must already exist and must - * not already be "awaited" or "consumed". - * - * If a name is specified, the first available event is consumed that matches - * the name. If there is no available event, it will create one with that name - * with status "awaited". - * @param event - */ - awaitEvent( - event: EventSpec, - ): Promise; -}; - export type UseApi = Expand<{ [mod in keyof API]: API[mod] extends FunctionReference< infer FType, @@ -119,10 +25,14 @@ export type UseApi = Expand<{ export type OpaqueIds = T extends GenericId ? string - : T extends WorkId - ? string + : T extends string + ? `${T}` extends T + ? T + : string : T extends (infer U)[] ? OpaqueIds[] - : T extends object - ? { [K in keyof T]: OpaqueIds } - : T; + : T extends ArrayBuffer + ? ArrayBuffer + : T extends object + ? { [K in keyof T]: OpaqueIds } + : T; diff --git a/src/client/workflowContext.ts b/src/client/workflowContext.ts new file mode 100644 index 0000000..ea6a1aa --- /dev/null +++ b/src/client/workflowContext.ts @@ -0,0 +1,189 @@ +import type { RetryOption } from "@convex-dev/workpool"; +import { BaseChannel } from "async-channel"; +import { parse } from "convex-helpers/validators"; +import type { + FunctionArgs, + FunctionReference, + FunctionReturnType, + FunctionType, +} from "convex/server"; +import type { Validator } from "convex/values"; +import type { EventId, SchedulerOptions, WorkflowId } from "../types.js"; +import { safeFunctionName } from "./safeFunctionName.js"; +import type { StepRequest } from "./step.js"; + +export type RunOptions = { + /** + * The name of the function. By default, if you pass in api.foo.bar.baz, + * it will use "foo/bar:baz" as the name. If you pass in a function handle, + * it will use the function handle directly. + */ + name?: string; +} & SchedulerOptions; + +export type WorkflowCtx = { + /** + * The ID of the workflow currently running. + */ + workflowId: WorkflowId; + /** + * Run a query with the given name and arguments. + * + * @param query - The query to run, like `internal.index.exampleQuery`. + * @param args - The arguments to the query function. + * @param opts - Options for scheduling and naming the query. + */ + runQuery>( + query: Query, + args: FunctionArgs, + opts?: RunOptions, + ): Promise>; + + /** + * Run a mutation with the given name and arguments. + * + * @param mutation - The mutation to run, like `internal.index.exampleMutation`. + * @param args - The arguments to the mutation function. + * @param opts - Options for scheduling and naming the mutation. + */ + runMutation>( + mutation: Mutation, + args: FunctionArgs, + opts?: RunOptions, + ): Promise>; + + /** + * Run an action with the given name and arguments. + * + * @param action - The action to run, like `internal.index.exampleAction`. + * @param args - The arguments to the action function. + * @param opts - Options for retrying, scheduling and naming the action. + */ + runAction>( + action: Action, + args: FunctionArgs, + opts?: RunOptions & RetryOption, + ): Promise>; + + /** + * Run a workflow with the given name and arguments. + * + * @param workflow - The workflow to run, like `internal.index.exampleWorkflow`. + * @param args - The arguments to the workflow function. + * @param opts - Options for retrying, scheduling and naming the workflow. + */ + runWorkflow>( + workflow: Workflow, + args: FunctionArgs["args"], + opts?: RunOptions, + ): Promise>; + + /** + * Blocks until a matching event is sent to this workflow. + * + * If an ID is specified, an event with that ID must already exist and must + * not already be "awaited" or "consumed". + * + * If a name is specified, the first available event is consumed that matches + * the name. If there is no available event, it will create one with that name + * with status "awaited". + * @param event + */ + awaitEvent( + event: ( + | { name: Name; id?: EventId } + | { name?: Name; id: EventId } + ) & { + validator?: Validator; + }, + ): Promise; +}; + +export function createWorkflowCtx( + workflowId: WorkflowId, + sender: BaseChannel, +) { + return { + workflowId, + runQuery: async (query, args, opts?) => { + return runFunction(sender, "query", query, args, opts); + }, + + runMutation: async (mutation, args, opts?) => { + return runFunction(sender, "mutation", mutation, args, opts); + }, + + runAction: async (action, args, opts?) => { + return runFunction(sender, "action", action, args, opts); + }, + + runWorkflow: async (workflow, args, opts?) => { + const { name, ...schedulerOptions } = opts ?? {}; + return run(sender, { + name: name ?? safeFunctionName(workflow), + target: { + kind: "workflow", + function: workflow, + args, + }, + retry: undefined, + schedulerOptions, + }); + }, + + awaitEvent: async (event) => { + const result = await run(sender, { + name: event.name ?? event.id ?? "Event", + target: { + kind: "event", + args: { eventId: event.id }, + }, + retry: undefined, + schedulerOptions: {}, + }); + if (event.validator) { + return parse(event.validator, result); + } + return result as any; + }, + } satisfies WorkflowCtx; +} + +async function runFunction< + F extends FunctionReference, +>( + sender: BaseChannel, + functionType: FunctionType, + f: F, + args: unknown, + opts?: RunOptions & RetryOption, +): Promise { + const { name, retry, ...schedulerOptions } = opts ?? {}; + return run(sender, { + name: name ?? safeFunctionName(f), + target: { + kind: "function", + functionType, + function: f, + args, + }, + retry, + schedulerOptions, + }); +} + +async function run( + sender: BaseChannel, + request: Omit, +): Promise { + let send: unknown; + const p = new Promise((resolve, reject) => { + send = sender.push({ + ...request, + resolve, + reject, + }); + }); + await send; + return p; +} diff --git a/src/client/workflowMutation.ts b/src/client/workflowMutation.ts index 489aa1d..34b5136 100644 --- a/src/client/workflowMutation.ts +++ b/src/client/workflowMutation.ts @@ -18,7 +18,7 @@ import { type JournalEntry } from "../component/schema.js"; import { setupEnvironment } from "./environment.js"; import type { WorkflowDefinition } from "./index.js"; import { StepExecutor, type StepRequest, type WorkerResult } from "./step.js"; -import { StepContext } from "./stepContext.js"; +import { createWorkflowCtx } from "./workflowContext.js"; import { checkArgs } from "./validator.js"; import { type RunResult, type WorkpoolOptions } from "@convex-dev/workpool"; import { type WorkflowComponent } from "./types.js"; @@ -117,7 +117,7 @@ export function workflowMutation( const channel = new BaseChannel( workpoolOptions.maxParallelism ?? 10, ); - const step = new StepContext(workflowId, channel); + const step = createWorkflowCtx(workflowId, channel); const executor = new StepExecutor( workflowId, generationNumber, diff --git a/src/component/_generated/api.d.ts b/src/component/_generated/api.d.ts index 32f5453..4a58fb0 100644 --- a/src/component/_generated/api.d.ts +++ b/src/component/_generated/api.d.ts @@ -57,7 +57,7 @@ export type Mounts = { | { kind: "success"; returnValue: any } | { error: string; kind: "failed" } | { kind: "canceled" }; - workflowId: string; + workflowId?: string; workpoolOptions?: { defaultRetryBehavior?: { base: number; @@ -99,6 +99,21 @@ export type Mounts = { startedAt: number; workId?: string; } + | { + args: any; + argsSize: number; + completedAt?: number; + handle: string; + inProgress: boolean; + kind: "workflow"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workflowId?: string; + } | { args: { eventId?: string }; argsSize: number; @@ -112,7 +127,6 @@ export type Mounts = { | { error: string; kind: "failed" } | { kind: "canceled" }; startedAt: number; - workId?: string; }; stepNumber: number; workflowId: string; @@ -164,6 +178,21 @@ export type Mounts = { startedAt: number; workId?: string; } + | { + args: any; + argsSize: number; + completedAt?: number; + handle: string; + inProgress: boolean; + kind: "workflow"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workflowId?: string; + } | { args: { eventId?: string }; argsSize: number; @@ -177,7 +206,6 @@ export type Mounts = { | { error: string; kind: "failed" } | { kind: "canceled" }; startedAt: number; - workId?: string; }; }>; workflowId: string; @@ -212,6 +240,21 @@ export type Mounts = { startedAt: number; workId?: string; } + | { + args: any; + argsSize: number; + completedAt?: number; + handle: string; + inProgress: boolean; + kind: "workflow"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workflowId?: string; + } | { args: { eventId?: string }; argsSize: number; @@ -225,7 +268,6 @@ export type Mounts = { | { error: string; kind: "failed" } | { kind: "canceled" }; startedAt: number; - workId?: string; }; stepNumber: number; workflowId: string; @@ -296,6 +338,21 @@ export type Mounts = { startedAt: number; workId?: string; } + | { + args: any; + argsSize: number; + completedAt?: number; + handle: string; + inProgress: boolean; + kind: "workflow"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workflowId?: string; + } | { args: { eventId?: string }; argsSize: number; @@ -309,7 +366,6 @@ export type Mounts = { | { error: string; kind: "failed" } | { kind: "canceled" }; startedAt: number; - workId?: string; }; stepNumber: number; workflowId: string; @@ -333,6 +389,45 @@ export type Mounts = { }; } >; + listSteps: FunctionReference< + "query", + "public", + { + order: "asc" | "desc"; + paginationOpts: { + cursor: string | null; + endCursor?: string | null; + id?: number; + maximumBytesRead?: number; + maximumRowsRead?: number; + numItems: number; + }; + workflowId: string; + }, + { + continueCursor: string; + isDone: boolean; + page: Array<{ + args: any; + completedAt?: number; + eventId?: string; + kind: "function" | "workflow" | "event"; + name: string; + nestedWorkflowId?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + stepId: string; + stepNumber: number; + workId?: string; + workflowId: string; + }>; + pageStatus?: "SplitRecommended" | "SplitRequired" | null; + splitCursor?: string | null; + } + >; }; }; // For now fullApiWithMounts is only fullApi which provides @@ -366,6 +461,7 @@ export declare const components: { "internal", { before?: number; + limit?: number; logLevel: "DEBUG" | "TRACE" | "INFO" | "REPORT" | "WARN" | "ERROR"; }, any diff --git a/src/component/event.ts b/src/component/event.ts index cf5ae72..a002316 100644 --- a/src/component/event.ts +++ b/src/component/event.ts @@ -58,7 +58,6 @@ export async function awaitEvent( } assert(entry.step.kind === "event", "Step is not an event"); entry.step.eventId = event._id; - await ctx.db.replace(entry._id, entry); // if there's a name, see if there's one to consume. // if it's there, mark it consumed and swap in the result. return entry; @@ -66,7 +65,7 @@ export async function awaitEvent( async function getOrCreateEvent( ctx: MutationCtx, - workflowId: Id<"workflows">, + workflowId: Id<"workflows"> | undefined, args: { eventId?: Id<"events">; name?: string }, statuses: Doc<"events">["state"]["kind"][], ): Promise> { @@ -80,6 +79,7 @@ async function getOrCreateEvent( return event; } assert(args.name, "Name is required if eventId is not specified"); + assert(workflowId, "workflowId is required if eventId is not specified"); for (const status of statuses) { const event = await ctx.db .query("events") @@ -102,7 +102,7 @@ async function getOrCreateEvent( export const send = mutation({ args: { - workflowId: v.id("workflows"), + workflowId: v.optional(v.id("workflows")), eventId: v.optional(v.id("events")), name: v.optional(v.string()), result: vResultValidator, @@ -119,16 +119,17 @@ export const send = mutation({ }, ["waiting", "created"], ); + const { workflowId } = event; const name = args.name ?? event.name; switch (event.state.kind) { case "sent": { throw new Error( - `Event already sent: ${event._id} (${name}) in workflow ${args.workflowId}`, + `Event already sent: ${event._id} (${name}) in workflow ${workflowId}`, ); } case "consumed": { throw new Error( - `Event already consumed: ${event._id} (${name}) in workflow ${args.workflowId}`, + `Event already consumed: ${event._id} (${name}) in workflow ${workflowId}`, ); } case "created": { @@ -141,7 +142,7 @@ export const send = mutation({ const step = await ctx.db.get(event.state.stepId); assert( step, - `Entry ${event.state.stepId} not found when sending event ${event._id} (${name}) in workflow ${args.workflowId}`, + `Entry ${event.state.stepId} not found when sending event ${event._id} (${name}) in workflow ${workflowId}`, ); assert(step.step.kind === "event", "Step is not an event"); step.step.eventId = event._id; @@ -161,13 +162,13 @@ export const send = mutation({ const anyMoreEvents = await ctx.db .query("events") .withIndex("workflowId_state", (q) => - q.eq("workflowId", args.workflowId).eq("state.kind", "waiting"), + q.eq("workflowId", workflowId).eq("state.kind", "waiting"), ) .order("desc") .first(); if (!anyMoreEvents) { - const workflow = await ctx.db.get(args.workflowId); - assert(workflow, `Workflow ${args.workflowId} not found`); + const workflow = await ctx.db.get(workflowId); + assert(workflow, `Workflow ${workflowId} not found`); const workpool = await getWorkpool(ctx, args.workpoolOptions); await enqueueWorkflow(ctx, workflow, workpool); } diff --git a/src/component/journal.ts b/src/component/journal.ts index 8175423..62aeafd 100644 --- a/src/component/journal.ts +++ b/src/component/journal.ts @@ -16,11 +16,12 @@ import { workpoolOptions, } from "./pool.js"; import { internal } from "./_generated/api.js"; -import { type FunctionHandle } from "convex/server"; +import { createFunctionHandle, type FunctionHandle } from "convex/server"; import { getDefaultLogger } from "./utils.js"; import { assert } from "convex-helpers"; import { MAX_JOURNAL_SIZE } from "../shared.js"; import { awaitEvent } from "./event.js"; +import { createHandler } from "./workflow.js"; export const load = query({ args: { @@ -53,7 +54,7 @@ export const load = query({ blocked: true, workflow, logLevel, - ok: false, + ok: true, }; } } @@ -111,15 +112,16 @@ export const startSteps = mutation({ const entries = await Promise.all( args.steps.map(async (stepArgs, index) => { - const { step, retry, schedulerOptions } = stepArgs; + const { retry, schedulerOptions } = stepArgs; const stepNumber = stepNumberBase + index; const stepId = await ctx.db.insert("steps", { workflowId: workflow._id, stepNumber, - step, + step: stepArgs.step, }); let entry = await ctx.db.get(stepId); assert(entry, "Step not found"); + const step = entry.step; const { name } = step; if (step.kind === "event") { // Note: This modifies entry in place as well. @@ -127,16 +129,35 @@ export const startSteps = mutation({ name, eventId: step.args.eventId, }); - if (entry.step.runResult) { + if (step.runResult) { console.event("eventConsumed", { workflowId: entry.workflowId, workflowName: workflow.name, - status: entry.step.runResult.kind, - eventName: entry.step.name, - stepNumber: entry.stepNumber, - durationMs: entry.step.completedAt! - entry.step.startedAt, + status: step.runResult.kind, + eventName: step.name, + stepNumber: stepNumber, + durationMs: step.completedAt! - step.startedAt, }); } + } else if (step.kind === "workflow") { + const workflowId = await createHandler(ctx, { + workflowName: step.name, + workflowHandle: step.handle, + workflowArgs: step.args, + maxParallelism: args.workpoolOptions?.maxParallelism, + onComplete: { + fnHandle: await createFunctionHandle( + internal.pool.nestedWorkflowOnComplete, + ), + context: { + stepId, + generationNumber, + workpoolOptions: args.workpoolOptions, + } satisfies OnCompleteContext, + }, + startAsync: true, + }); + step.workflowId = workflowId; } else { const context: OnCompleteContext = { generationNumber, @@ -173,9 +194,9 @@ export const startSteps = mutation({ break; } } - entry.step.workId = workId; - await ctx.db.replace(entry._id, entry); + step.workId = workId; } + await ctx.db.replace(entry._id, entry); console.event("started", { workflowId: workflow._id, diff --git a/src/component/pool.ts b/src/component/pool.ts index 0663fa3..ef63afb 100644 --- a/src/component/pool.ts +++ b/src/component/pool.ts @@ -3,6 +3,8 @@ import { vRetryBehavior, vWorkIdValidator, Workpool, + type RunResult, + type WorkId, type WorkpoolOptions, } from "@convex-dev/workpool"; import { assert } from "convex-helpers"; @@ -20,6 +22,7 @@ import { getWorkflow } from "./model.js"; import { getDefaultLogger } from "./utils.js"; import { completeHandler } from "./workflow.js"; import type { Doc } from "./_generated/dataModel.js"; +import { vWorkflowId, type WorkflowId } from "../types.js"; export const workpoolOptions = v.object({ logLevel: v.optional(logLevel), @@ -70,95 +73,116 @@ export const onComplete = internalMutation({ context: v.any(), // Ensure we can catch invalid context to fail workflow. }, returns: v.null(), - handler: async (ctx, args) => { - const console = await getDefaultLogger(ctx); - const stepId = - "stepId" in args.context - ? ctx.db.normalizeId("steps", args.context.stepId) - : null; - if (!stepId) { - // Write to failures table and return - // So someone can investigate if this ever happens - console.error("Invalid onComplete context", args.context); - await ctx.db.insert("onCompleteFailures", args); - return; - } - const journalEntry = await ctx.db.get(stepId); - assert(journalEntry, `Journal entry not found: ${stepId}`); - const workflowId = journalEntry.workflowId; + handler: onCompleteHandler, +}); - if ( - !validate(onCompleteContext, args.context, { allowUnknownFields: true }) - ) { - const error = - `Invalid onComplete context for workId ${args.workId}` + - JSON.stringify(args.context); - await ctx.db.patch(workflowId, { - runResult: { - kind: "failed", - error, - }, - }); - return; - } - const { generationNumber } = args.context; - const workflow = await getWorkflow(ctx, workflowId, null); - if (workflow.generationNumber !== generationNumber) { - console.error( - `Workflow: ${workflowId} already has generation number ${workflow.generationNumber} when completing ${stepId}`, - ); - return; - } - if (!journalEntry.step.inProgress) { - console.error( - `Step finished but journal entry not in progress: ${stepId} status: ${journalEntry.step.runResult?.kind ?? "pending"}`, - ); - return; - } - journalEntry.step.inProgress = false; - journalEntry.step.completedAt = Date.now(); - switch (args.result.kind) { - case "success": - journalEntry.step.runResult = { - kind: "success", - returnValue: args.result.returnValue, - }; - break; - case "failed": - journalEntry.step.runResult = { - kind: "failed", - error: args.result.error, - }; - break; - case "canceled": - journalEntry.step.runResult = { - kind: "canceled", - }; - break; - } - await ctx.db.replace(journalEntry._id, journalEntry); - console.debug(`Completed execution of ${stepId}`, journalEntry); +// For a nested workflow +export const nestedWorkflowOnComplete = internalMutation({ + args: { + workflowId: vWorkflowId, + result: vResultValidator, + context: v.any(), + }, + returns: v.null(), + handler: onCompleteHandler, +}); - console.event("stepCompleted", { - workflowId, - workflowName: workflow.name, - status: args.result.kind, - stepName: journalEntry.step.name, - stepNumber: journalEntry.stepNumber, - durationMs: journalEntry.step.completedAt - journalEntry.step.startedAt, +async function onCompleteHandler( + ctx: MutationCtx, + args: { + workId?: WorkId; + workflowId?: WorkflowId; + result: RunResult; + context: object; + }, +) { + const console = await getDefaultLogger(ctx); + const stepId = + "stepId" in args.context && typeof args.context.stepId === "string" + ? ctx.db.normalizeId("steps", args.context.stepId) + : null; + if (!stepId) { + // Write to failures table and return + // So someone can investigate if this ever happens + console.error("Invalid onComplete context", args.context); + await ctx.db.insert("onCompleteFailures", args); + return; + } + const journalEntry = await ctx.db.get(stepId); + assert(journalEntry, `Journal entry not found: ${stepId}`); + const workflowId = journalEntry.workflowId; + + if ( + !validate(onCompleteContext, args.context, { allowUnknownFields: true }) + ) { + const error = + `Invalid onComplete context for ${args.workId ? `workId ${args.workId}` : `nested workflowId ${args.workflowId}`}` + + JSON.stringify(args.context); + await ctx.db.patch(workflowId, { + runResult: { + kind: "failed", + error, + }, }); - if (workflow.runResult !== undefined) { - if (workflow.runResult.kind !== "canceled") { - console.error( - `Workflow: ${workflowId} already ${workflow.runResult.kind} when completing ${stepId} with status ${args.result.kind}`, - ); - } - return; + return; + } + const { generationNumber } = args.context; + const workflow = await getWorkflow(ctx, workflowId, null); + if (workflow.generationNumber !== generationNumber) { + console.error( + `Workflow: ${workflowId} already has generation number ${workflow.generationNumber} when completing ${stepId}`, + ); + return; + } + if (!journalEntry.step.inProgress) { + console.error( + `Step finished but journal entry not in progress: ${stepId} status: ${journalEntry.step.runResult?.kind ?? "pending"}`, + ); + return; + } + journalEntry.step.inProgress = false; + journalEntry.step.completedAt = Date.now(); + switch (args.result.kind) { + case "success": + journalEntry.step.runResult = { + kind: "success", + returnValue: args.result.returnValue, + }; + break; + case "failed": + journalEntry.step.runResult = { + kind: "failed", + error: args.result.error, + }; + break; + case "canceled": + journalEntry.step.runResult = { + kind: "canceled", + }; + break; + } + await ctx.db.replace(journalEntry._id, journalEntry); + console.debug(`Completed execution of ${stepId}`, journalEntry); + + console.event("stepCompleted", { + workflowId, + workflowName: workflow.name, + status: args.result.kind, + stepName: journalEntry.step.name, + stepNumber: journalEntry.stepNumber, + durationMs: journalEntry.step.completedAt - journalEntry.step.startedAt, + }); + if (workflow.runResult !== undefined) { + if (workflow.runResult.kind !== "canceled") { + console.error( + `Workflow: ${workflowId} already ${workflow.runResult.kind} when completing ${stepId} with status ${args.result.kind}`, + ); } - const workpool = await getWorkpool(ctx, args.context.workpoolOptions); - await enqueueWorkflow(ctx, workflow, workpool); - }, -}); + return; + } + const workpool = await getWorkpool(ctx, args.context.workpoolOptions); + await enqueueWorkflow(ctx, workflow, workpool); +} export async function enqueueWorkflow( ctx: MutationCtx, diff --git a/src/component/schema.ts b/src/component/schema.ts index 749afaf..0cfdd3c 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -62,7 +62,6 @@ export type Workflow = Infer; const stepCommonFields = { name: v.string(), inProgress: v.boolean(), - workId: v.optional(vWorkIdValidator), argsSize: v.number(), args: v.any(), runResult: v.optional(vResultValidator), @@ -75,6 +74,13 @@ export const step = v.union( kind: v.optional(v.literal("function")), functionType: literals("query", "mutation", "action"), handle: v.string(), + workId: v.optional(vWorkIdValidator), + ...stepCommonFields, + }), + v.object({ + kind: v.literal("workflow"), + handle: v.string(), + workflowId: v.optional(v.id("workflows")), ...stepCommonFields, }), v.object({ @@ -90,13 +96,21 @@ function stepSize(step: Step): number { let size = 0; size += step.name.length; size += 1; // inProgress - if (step.workId) { - size += step.workId.length; - } if (step.kind) size += step.kind.length; - if (step.kind !== "event") { - size += step.functionType.length; - size += step.handle.length; + switch (step.kind) { + case undefined: + case "function": + size += step.handle.length; + size += step.functionType.length; + size += step.workId?.length ?? 0; + break; + case "workflow": + size += step.handle.length; + size += step.workflowId?.length ?? 0; + break; + case "event": + size += step.eventId?.length ?? 0; + break; } size += 8 + step.argsSize; if (step.runResult) { @@ -173,7 +187,8 @@ export default defineSchema({ onCompleteFailures: defineTable( v.union( v.object({ - workId: vWorkIdValidator, + workId: v.optional(vWorkIdValidator), + workflowId: v.optional(v.string()), result: vResultValidator, context: v.any(), }), diff --git a/src/component/workflow.ts b/src/component/workflow.ts index fe03b06..3f896e0 100644 --- a/src/component/workflow.ts +++ b/src/component/workflow.ts @@ -1,65 +1,92 @@ import { vResultValidator } from "@convex-dev/workpool"; import { assert } from "convex-helpers"; -import type { FunctionHandle } from "convex/server"; +import { + paginationOptsValidator, + type FunctionHandle, + type PaginationResult, +} from "convex/server"; import { type Infer, v } from "convex/values"; import { mutation, type MutationCtx, query } from "./_generated/server.js"; import { type Logger, logLevel } from "./logging.js"; import { getWorkflow } from "./model.js"; import { getWorkpool } from "./pool.js"; -import { journalDocument, vOnComplete, workflowDocument } from "./schema.js"; +import schema, { + journalDocument, + vOnComplete, + workflowDocument, + type JournalEntry, +} from "./schema.js"; import { getDefaultLogger } from "./utils.js"; -import type { WorkflowId, OnCompleteArgs } from "../types.js"; -import { internal } from "./_generated/api.js"; +import { + type WorkflowId, + type OnCompleteArgs, + type WorkflowStep, + type EventId, + vPaginationResult, + vWorkflowStep, + type SchedulerOptions, +} from "../types.js"; +import { api, internal } from "./_generated/api.js"; import { formatErrorWithStack } from "../shared.js"; +import type { Id } from "./_generated/dataModel.js"; +import { paginator } from "convex-helpers/server/pagination"; +const createArgs = v.object({ + workflowName: v.string(), + workflowHandle: v.string(), + workflowArgs: v.any(), + maxParallelism: v.optional(v.number()), + onComplete: v.optional(vOnComplete), + startAsync: v.optional(v.boolean()), + // TODO: ttl +}); export const create = mutation({ - args: { - workflowName: v.string(), - workflowHandle: v.string(), - workflowArgs: v.any(), - maxParallelism: v.optional(v.number()), - onComplete: v.optional(vOnComplete), - startAsync: v.optional(v.boolean()), - // TODO: ttl - }, + args: createArgs, returns: v.id("workflows"), - handler: async (ctx, args) => { - const console = await getDefaultLogger(ctx); - await updateMaxParallelism(ctx, console, args.maxParallelism); - const workflowId = await ctx.db.insert("workflows", { - name: args.workflowName, - workflowHandle: args.workflowHandle, - args: args.workflowArgs, + handler: createHandler, +}); + +export async function createHandler( + ctx: MutationCtx, + args: Infer, + schedulerOptions?: SchedulerOptions, +) { + const console = await getDefaultLogger(ctx); + await updateMaxParallelism(ctx, console, args.maxParallelism); + const workflowId = await ctx.db.insert("workflows", { + name: args.workflowName, + workflowHandle: args.workflowHandle, + args: args.workflowArgs, + generationNumber: 0, + onComplete: args.onComplete, + }); + console.debug( + `Created workflow ${workflowId}:`, + args.workflowArgs, + args.workflowHandle, + ); + if (args.startAsync) { + const workpool = await getWorkpool(ctx, args); + await workpool.enqueueMutation( + ctx, + args.workflowHandle as FunctionHandle<"mutation">, + { workflowId, generationNumber: 0 }, + { + name: args.workflowName, + onComplete: internal.pool.handlerOnComplete, + context: { workflowId, generationNumber: 0 }, + ...schedulerOptions, + }, + ); + } else { + // If we can't start it, may as well not create it, eh? Fail fast... + await ctx.runMutation(args.workflowHandle as FunctionHandle<"mutation">, { + workflowId, generationNumber: 0, - onComplete: args.onComplete, }); - console.debug( - `Created workflow ${workflowId}:`, - args.workflowArgs, - args.workflowHandle, - ); - if (args.startAsync) { - const workpool = await getWorkpool(ctx, args); - await workpool.enqueueMutation( - ctx, - args.workflowHandle as FunctionHandle<"mutation">, - { workflowId, generationNumber: 0 }, - { - name: args.workflowName, - onComplete: internal.pool.handlerOnComplete, - context: { workflowId, generationNumber: 0 }, - }, - ); - } else { - // If we can't start it, may as well not create it, eh? Fail fast... - await ctx.runMutation(args.workflowHandle as FunctionHandle<"mutation">, { - workflowId, - generationNumber: 0, - }); - } - return workflowId; - }, -}); + } + return workflowId; +} export const getStatus = query({ args: { @@ -86,6 +113,60 @@ export const getStatus = query({ }, }); +function publicWorkflowId(workflowId: Id<"workflows">): WorkflowId { + return workflowId as any; +} + +function publicStep(step: JournalEntry): WorkflowStep { + return { + workflowId: publicWorkflowId(step.workflowId), + name: step.step.name, + stepId: step._id, + stepNumber: step.stepNumber, + + args: step.step.args, + runResult: step.step.runResult, + + startedAt: step.step.startedAt, + completedAt: step.step.completedAt, + + ...(step.step.kind === "event" + ? { + kind: "event", + eventId: step.step.eventId as unknown as EventId, + } + : step.step.kind === "workflow" + ? { + kind: "workflow", + nestedWorkflowId: publicWorkflowId(step.step.workflowId!), + } + : { + kind: "function", + workId: step.step.workId!, + }), + } satisfies WorkflowStep; +} + +export const listSteps = query({ + args: { + workflowId: v.id("workflows"), + order: v.union(v.literal("asc"), v.literal("desc")), + paginationOpts: paginationOptsValidator, + }, + returns: vPaginationResult(vWorkflowStep), + handler: async (ctx, args) => { + const result = await paginator(ctx.db, schema) + .query("steps") + .withIndex("workflow", (q) => q.eq("workflowId", args.workflowId)) + .order(args.order) + .paginate(args.paginationOpts); + return { + ...result, + page: result.page.map(publicStep), + } as PaginationResult>; + }, +}); + export const cancel = mutation({ args: { workflowId: v.id("workflows"), @@ -147,9 +228,17 @@ export async function completeHandler( .collect(); if (inProgress.length > 0) { const workpool = await getWorkpool(ctx, {}); - for (const step of inProgress) { - if (step.step.workId) { - await workpool.cancel(ctx, step.step.workId); + for (const { step } of inProgress) { + if (!step.kind || step.kind === "function") { + if (step.workId) { + await workpool.cancel(ctx, step.workId); + } + } else if (step.kind === "workflow") { + if (step.workflowId) { + await ctx.runMutation(api.workflow.cancel, { + workflowId: step.workflowId, + }); + } } } } diff --git a/src/test.ts b/src/test.ts index fef4c82..7a340a7 100644 --- a/src/test.ts +++ b/src/test.ts @@ -1,3 +1,19 @@ +import type { TestConvex } from "convex-test"; +import type { GenericSchema, SchemaDefinition } from "convex/server"; +import workpool from "@convex-dev/workpool/test"; import schema from "./component/schema.js"; -const modules = import.meta.glob("./**/*.ts"); -export default { schema, modules }; +const modules = import.meta.glob("./component/**/*.ts"); + +/** + * Register the component with the test convex instance. + * @param t - The test convex instance, e.g. from calling `convexTest`. + * @param name - The name of the component, as registered in convex.config.ts. + */ +function register( + t: TestConvex>, + name: string = "workflow", +) { + t.registerComponent(name, schema, modules); + workpool.register(t, `${name}/workpool`); +} +export default { register, schema, modules }; diff --git a/src/types.ts b/src/types.ts index 86d588d..f7a7d6d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,27 +1,92 @@ -import type { RunResult } from "@convex-dev/workpool"; -import { v, type Validator, type VNull, type VString } from "convex/values"; +import { + vResultValidator, + vWorkIdValidator, + type RunResult, + type WorkId, +} from "@convex-dev/workpool"; +import { + v, + type Infer, + type Validator, + type Value, + type VString, +} from "convex/values"; export type WorkflowId = string & { __isWorkflowId: true }; export const vWorkflowId = v.string() as VString; -export type EventId = string & { +export type EventId = string & { __isEventId: true; __name: Name; }; export type VEventId = VString>; -export const vEventId = v.string() as VString>; +export const vEventId = (_name?: Name) => + v.string() as VString>; -export type EventSpec = { - name: Name; - validator?: Validator; - id?: EventId; -}; +export type WorkflowStep = { + workflowId: WorkflowId; + name: string; + stepId: string; + stepNumber: number; + + args: unknown; + runResult?: RunResult; + + startedAt: number; + completedAt?: number; +} & ( + | { kind: "function"; workId: WorkId } + | { kind: "workflow"; nestedWorkflowId: WorkflowId } + | { kind: "event"; eventId: EventId } +); + +export const vWorkflowStep = v.object({ + workflowId: vWorkflowId, + name: v.string(), + stepId: v.string(), + stepNumber: v.number(), + + args: v.any(), + runResult: v.optional(vResultValidator), + + startedAt: v.number(), + completedAt: v.optional(v.number()), + + kind: v.union( + v.literal("function"), + v.literal("workflow"), + v.literal("event"), + ), + workId: v.optional(vWorkIdValidator), + nestedWorkflowId: v.optional(vWorkflowId), + eventId: v.optional(vEventId()), +}); +// type assertion to keep us in check +const _: Infer = {} as WorkflowStep; + +export type SchedulerOptions = + | { + /** + * The time (ms since epoch) to run the action at. + * If not provided, the action will be run as soon as possible. + * Note: this is advisory only. It may run later. + */ + runAt?: number; + } + | { + /** + * The number of milliseconds to run the action after. + * If not provided, the action will be run as soon as possible. + * Note: this is advisory only. It may run later. + */ + runAfter?: number; + }; export type OnCompleteArgs = { /** * The ID of the work that completed. */ - workflowId: WorkflowId; + workflowId: string; /** * The context object passed when enqueuing the work. * Useful for passing data from the enqueue site to the onComplete site. @@ -32,3 +97,21 @@ export type OnCompleteArgs = { */ result: RunResult; }; + +export function vPaginationResult< + T extends Validator, +>(itemValidator: T) { + return v.object({ + page: v.array(itemValidator), + continueCursor: v.string(), + isDone: v.boolean(), + splitCursor: v.optional(v.union(v.string(), v.null())), + pageStatus: v.optional( + v.union( + v.literal("SplitRecommended"), + v.literal("SplitRequired"), + v.null(), + ), + ), + }); +}