From 5b3fda0fb6a3d1567e267659f40bab25067d66e2 Mon Sep 17 00:00:00 2001 From: Franco Aparicio <8digitcreative@gmail.com> Date: Fri, 6 Feb 2026 16:19:49 -0400 Subject: [PATCH] feat: add Webhook Trigger & Response nodes for external workflow execution - Add Webhook Trigger node with configurable image inputs, text, and auth secret - Add Webhook Response node accepting both image and video connections - Implement async execution with callback URL for external platforms - Propagate source URLs from all providers (WaveSpeed, Replicate, Fal) through pipeline - Keep callback payloads small by sending media URLs instead of base64 --- .gitignore | 5 +- src/app/api/generate/route.ts | 8 + src/app/api/webhook/[workflowId]/route.ts | 285 +++++++++++ src/app/api/webhook/jobs/[jobId]/route.ts | 34 ++ src/components/ConnectionDropMenu.tsx | 36 ++ src/components/FloatingActionBar.tsx | 100 +++- src/components/WorkflowCanvas.tsx | 32 +- src/components/nodes/WebhookResponseNode.tsx | 108 ++++ src/components/nodes/WebhookTriggerNode.tsx | 169 +++++++ src/components/nodes/index.ts | 2 + src/lib/executor/graphUtils.ts | 287 +++++++++++ src/lib/executor/headlessExecutor.ts | 497 +++++++++++++++++++ src/lib/executor/jobStore.ts | 74 +++ src/lib/quickstart/validation.ts | 33 +- src/store/utils/nodeDefaults.ts | 17 + src/store/workflowStore.ts | 135 +++-- src/types/api.ts | 1 + src/types/index.ts | 1 + src/types/nodes.ts | 29 +- src/types/webhook.ts | 71 +++ 20 files changed, 1817 insertions(+), 107 deletions(-) create mode 100644 src/app/api/webhook/[workflowId]/route.ts create mode 100644 src/app/api/webhook/jobs/[jobId]/route.ts create mode 100644 src/components/nodes/WebhookResponseNode.tsx create mode 100644 src/components/nodes/WebhookTriggerNode.tsx create mode 100644 src/lib/executor/graphUtils.ts create mode 100644 src/lib/executor/headlessExecutor.ts create mode 100644 src/lib/executor/jobStore.ts create mode 100644 src/types/webhook.ts diff --git a/.gitignore b/.gitignore index e8cce894..d2e23666 100644 --- a/.gitignore +++ b/.gitignore @@ -44,4 +44,7 @@ logs/*.json .claude/ # planning docs -.planning/ \ No newline at end of file +.planning/ + +# local files +Flujo correcto Git Push:Pull.md \ No newline at end of file diff --git a/src/app/api/generate/route.ts b/src/app/api/generate/route.ts index 33f42022..811dbc05 100644 --- a/src/app/api/generate/route.ts +++ b/src/app/api/generate/route.ts @@ -2448,6 +2448,7 @@ export async function POST(request: NextRequest) { success: true, video: isUrl ? undefined : output.data, videoUrl: isUrl ? output.data : undefined, + ...(output.url && { sourceUrl: output.url }), contentType: "video", }); } @@ -2455,6 +2456,7 @@ export async function POST(request: NextRequest) { return NextResponse.json({ success: true, image: output.data, + ...(output.url && { sourceUrl: output.url }), contentType: "image", }); } @@ -2535,6 +2537,7 @@ export async function POST(request: NextRequest) { success: true, video: isUrl ? undefined : output.data, videoUrl: isUrl ? output.data : undefined, + ...(output.url && { sourceUrl: output.url }), contentType: "video", }); } @@ -2542,6 +2545,7 @@ export async function POST(request: NextRequest) { return NextResponse.json({ success: true, image: output.data, + ...(output.url && { sourceUrl: output.url }), contentType: "image", }); } @@ -2626,6 +2630,7 @@ export async function POST(request: NextRequest) { success: true, video: isUrl ? undefined : output.data, videoUrl: isUrl ? output.data : undefined, + ...(output.url && { sourceUrl: output.url }), contentType: "video", }); } @@ -2633,6 +2638,7 @@ export async function POST(request: NextRequest) { return NextResponse.json({ success: true, image: output.data, + ...(output.url && { sourceUrl: output.url }), contentType: "image", }); } @@ -2716,6 +2722,7 @@ export async function POST(request: NextRequest) { success: true, video: isUrl ? undefined : output.data, videoUrl: isUrl ? output.data : undefined, + ...(output.url && { sourceUrl: output.url }), contentType: "video", }); } @@ -2723,6 +2730,7 @@ export async function POST(request: NextRequest) { return NextResponse.json({ success: true, image: output.data, + ...(output.url && { sourceUrl: output.url }), contentType: "image", }); } diff --git a/src/app/api/webhook/[workflowId]/route.ts b/src/app/api/webhook/[workflowId]/route.ts new file mode 100644 index 00000000..028195bd --- /dev/null +++ b/src/app/api/webhook/[workflowId]/route.ts @@ -0,0 +1,285 @@ +import { NextRequest, NextResponse } from "next/server"; +import * as fs from "fs/promises"; +import * as path from "path"; +import type { WebhookRequest, WebhookOutput } from "@/types/webhook"; +import type { WebhookTriggerNodeData } from "@/types"; +import type { WorkflowNode } from "@/types"; +import type { WorkflowEdge } from "@/types/workflow"; +import { + executeWorkflowHeadless, + downloadImageToBase64, +} from "@/lib/executor/headlessExecutor"; +import { createJobId, createJob, completeJob, failJob, getJob } from "@/lib/executor/jobStore"; + +export const maxDuration = 300; // 5 minutes +export const dynamic = "force-dynamic"; + +const MAX_IMAGES = 10; +const IMAGE_DOWNLOAD_TIMEOUT_MS = 30_000; + +interface WorkflowFile { + version: number; + id?: string; + name: string; + nodes: WorkflowNode[]; + edges: WorkflowEdge[]; +} + +/** + * POST /api/webhook/[workflowId] + * + * Execute a workflow via webhook. The workflowId is the workflow filename (without .json). + * + * Request body: WebhookRequest + * - images: array of URLs or base64 strings (max 10) + * - text: prompt text + * - workflowDir: directory where the workflow JSON is saved + * - callbackUrl: (optional) for async execution + * - apiKeys: (optional) provider API key overrides + * + * Response: WebhookSyncResponse or WebhookAsyncResponse + */ +export async function POST( + request: NextRequest, + { params }: { params: Promise<{ workflowId: string }> } +) { + const startTime = Date.now(); + const { workflowId } = await params; + + let body: WebhookRequest & { workflowDir?: string }; + try { + body = await request.json(); + } catch { + return NextResponse.json( + { success: false, error: "Invalid JSON body" }, + { status: 400 } + ); + } + + // --- Validate request --- + if (body.images && body.images.length > MAX_IMAGES) { + return NextResponse.json( + { success: false, error: `Maximum ${MAX_IMAGES} images allowed` }, + { status: 400 } + ); + } + + // --- Locate workflow file --- + const workflowDir = + body.workflowDir || process.env.WEBHOOK_WORKFLOWS_DIR || process.cwd(); + const workflowPath = path.join(workflowDir, `${workflowId}.json`); + + let workflowFile: WorkflowFile; + try { + const raw = await fs.readFile(workflowPath, "utf-8"); + workflowFile = JSON.parse(raw); + } catch (err) { + return NextResponse.json( + { + success: false, + error: `Workflow "${workflowId}" not found at ${workflowPath}. Set workflowDir in request body or WEBHOOK_WORKFLOWS_DIR env var.`, + }, + { status: 404 } + ); + } + + // --- Find WebhookTrigger node --- + const triggerNode = workflowFile.nodes.find( + (n) => n.type === "webhookTrigger" + ); + + if (!triggerNode) { + return NextResponse.json( + { + success: false, + error: + "Workflow has no Webhook Trigger node. Add a webhookTrigger node to your workflow.", + }, + { status: 400 } + ); + } + + // --- Validate webhook secret --- + const triggerData = triggerNode.data as WebhookTriggerNodeData; + if (triggerData.webhookSecret) { + const authHeader = request.headers.get("authorization"); + const token = authHeader?.startsWith("Bearer ") + ? authHeader.slice(7) + : null; + + if (token !== triggerData.webhookSecret) { + return NextResponse.json( + { success: false, error: "Unauthorized: invalid or missing Bearer token" }, + { status: 401 } + ); + } + } + + // --- Pre-download all images in parallel --- + let resolvedImages: (string | null)[] = []; + if (body.images && body.images.length > 0) { + const downloadPromises = body.images.map(async (img, i) => { + if (!img) return null; + try { + const controller = new AbortController(); + const timeout = setTimeout( + () => controller.abort(), + IMAGE_DOWNLOAD_TIMEOUT_MS + ); + const result = await downloadImageToBase64(img); + clearTimeout(timeout); + return result; + } catch (err) { + throw new Error( + `Failed to download image[${i}] (${img.substring(0, 80)}...): ${err instanceof Error ? err.message : String(err)}` + ); + } + }); + + try { + resolvedImages = await Promise.all(downloadPromises); + } catch (err) { + return NextResponse.json( + { + success: false, + error: err instanceof Error ? err.message : "Image download failed", + }, + { status: 400 } + ); + } + } + + // --- Inject data into the trigger node --- + const injectedNodes = workflowFile.nodes.map((node) => { + if (node.id !== triggerNode.id) return node; + return { + ...node, + data: { + ...node.data, + images: resolvedImages, + text: body.text || null, + }, + } as WorkflowNode; + }); + + // --- Build API keys from request + env --- + const apiKeys: Record = { + ...(process.env.GEMINI_API_KEY ? { gemini: process.env.GEMINI_API_KEY } : {}), + ...(process.env.OPENAI_API_KEY ? { openai: process.env.OPENAI_API_KEY } : {}), + ...(process.env.KIE_API_KEY ? { kie: process.env.KIE_API_KEY } : {}), + ...(process.env.REPLICATE_API_TOKEN ? { replicate: process.env.REPLICATE_API_TOKEN } : {}), + ...(process.env.FAL_KEY ? { fal: process.env.FAL_KEY } : {}), + ...(process.env.WAVESPEED_API_KEY ? { wavespeed: process.env.WAVESPEED_API_KEY } : {}), + ...body.apiKeys, + }; + + // --- Determine base URL for internal API calls --- + const proto = request.headers.get("x-forwarded-proto") || "http"; + const host = request.headers.get("host") || "localhost:3000"; + const baseUrl = `${proto}://${host}`; + + // --- Async mode --- + if (body.callbackUrl) { + const jobId = createJobId(); + createJob(jobId, body.callbackUrl); + + // Fire and forget - execute in background + executeInBackground( + injectedNodes, + workflowFile.edges, + apiKeys, + baseUrl, + jobId, + body.callbackUrl, + body.recordId + ); + + return NextResponse.json({ + success: true, + jobId, + status: "processing", + }); + } + + // --- Sync mode --- + try { + const outputs = await executeWorkflowHeadless({ + nodes: injectedNodes, + edges: workflowFile.edges, + apiKeys, + baseUrl, + }); + + return NextResponse.json({ + success: true, + outputs, + executionTime: Date.now() - startTime, + }); + } catch (err) { + return NextResponse.json( + { + success: false, + error: err instanceof Error ? err.message : "Execution failed", + executionTime: Date.now() - startTime, + }, + { status: 500 } + ); + } +} + +/** + * Background execution for async mode. + * Runs the workflow and sends results to the callback URL. + */ +async function executeInBackground( + nodes: WorkflowNode[], + edges: WorkflowEdge[], + apiKeys: Record, + baseUrl: string, + jobId: string, + callbackUrl: string, + recordId?: string +): Promise { + try { + const outputs = await executeWorkflowHeadless({ + nodes, + edges, + apiKeys, + baseUrl, + }); + + completeJob(jobId, outputs); + + // Extract media URL for the callback (skip base64 - too large for Airtable) + const firstImage = outputs.find((o) => o.type === "image"); + const firstVideo = outputs.find((o) => o.type === "video"); + const mediaUrl: string | null = firstImage?.url || firstVideo?.url || null; + + // Send results to callback URL + await fetch(callbackUrl, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + success: true, + jobId, + ...(recordId && { recordId }), + ...(mediaUrl && { imageUrl: mediaUrl }), + }), + }).catch(() => {}); + } catch (err) { + const errorMsg = err instanceof Error ? err.message : "Execution failed"; + failJob(jobId, errorMsg); + + // Send error to callback URL + await fetch(callbackUrl, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + success: false, + jobId, + ...(recordId && { recordId }), + error: errorMsg, + }), + }).catch(() => {}); + } +} diff --git a/src/app/api/webhook/jobs/[jobId]/route.ts b/src/app/api/webhook/jobs/[jobId]/route.ts new file mode 100644 index 00000000..4436f687 --- /dev/null +++ b/src/app/api/webhook/jobs/[jobId]/route.ts @@ -0,0 +1,34 @@ +import { NextRequest, NextResponse } from "next/server"; +import { getJob } from "@/lib/executor/jobStore"; + +export const dynamic = "force-dynamic"; + +/** + * GET /api/webhook/jobs/[jobId] + * + * Check the status of an async webhook job. + */ +export async function GET( + _request: NextRequest, + { params }: { params: Promise<{ jobId: string }> } +) { + const { jobId } = await params; + + const job = getJob(jobId); + if (!job) { + return NextResponse.json( + { success: false, error: "Job not found or expired" }, + { status: 404 } + ); + } + + return NextResponse.json({ + success: true, + jobId, + status: job.status, + ...(job.result && { outputs: job.result }), + ...(job.error && { error: job.error }), + createdAt: job.createdAt, + ...(job.completedAt && { completedAt: job.completedAt }), + }); +} diff --git a/src/components/ConnectionDropMenu.tsx b/src/components/ConnectionDropMenu.tsx index 5127de91..ca03dc92 100644 --- a/src/components/ConnectionDropMenu.tsx +++ b/src/components/ConnectionDropMenu.tsx @@ -88,6 +88,15 @@ const IMAGE_TARGET_OPTIONS: MenuOption[] = [ ), }, + { + type: "webhookResponse", + label: "Webhook Response", + icon: ( + + + + ), + }, ]; const TEXT_TARGET_OPTIONS: MenuOption[] = [ @@ -149,6 +158,15 @@ const IMAGE_SOURCE_OPTIONS: MenuOption[] = [ ), }, + { + type: "webhookTrigger", + label: "Webhook Trigger", + icon: ( + + + + ), + }, { type: "annotation", label: "Annotate", @@ -170,6 +188,15 @@ const IMAGE_SOURCE_OPTIONS: MenuOption[] = [ ]; const TEXT_SOURCE_OPTIONS: MenuOption[] = [ + { + type: "webhookTrigger", + label: "Webhook Trigger", + icon: ( + + + + ), + }, { type: "prompt", label: "Prompt", @@ -237,6 +264,15 @@ const VIDEO_TARGET_OPTIONS: MenuOption[] = [ ), }, + { + type: "webhookResponse", + label: "Webhook Response", + icon: ( + + + + ), + }, ]; // GenerateVideo and VideoStitch nodes produce video output diff --git a/src/components/FloatingActionBar.tsx b/src/components/FloatingActionBar.tsx index 69958f20..4f8dbac4 100644 --- a/src/components/FloatingActionBar.tsx +++ b/src/components/FloatingActionBar.tsx @@ -55,6 +55,104 @@ function NodeButton({ type, label }: NodeButtonProps) { ); } +function InOutComboButton() { + const [isOpen, setIsOpen] = useState(false); + const menuRef = useRef(null); + const addNode = useWorkflowStore((state) => state.addNode); + const { screenToFlowPosition } = useReactFlow(); + + useEffect(() => { + const handleClickOutside = (event: MouseEvent) => { + if (menuRef.current && !menuRef.current.contains(event.target as Node)) { + setIsOpen(false); + } + }; + + if (isOpen) { + document.addEventListener("mousedown", handleClickOutside); + } + + return () => { + document.removeEventListener("mousedown", handleClickOutside); + }; + }, [isOpen]); + + const handleAddNode = (type: NodeType) => { + const center = getPaneCenter(); + const position = screenToFlowPosition({ + x: center.x + Math.random() * 100 - 50, + y: center.y + Math.random() * 100 - 50, + }); + + addNode(type, position); + setIsOpen(false); + }; + + const handleDragStart = (event: React.DragEvent, type: NodeType) => { + event.dataTransfer.setData("application/node-type", type); + event.dataTransfer.effectAllowed = "copy"; + setIsOpen(false); + }; + + return ( +
+ + + {isOpen && ( +
+ + + +
+ )} +
+ ); +} + function GenerateComboButton() { const [isOpen, setIsOpen] = useState(false); const menuRef = useRef(null); @@ -241,7 +339,7 @@ export function FloatingActionBar() { - + {/* Browse models button */}
diff --git a/src/components/WorkflowCanvas.tsx b/src/components/WorkflowCanvas.tsx index 8d6d0671..a214c2d8 100644 --- a/src/components/WorkflowCanvas.tsx +++ b/src/components/WorkflowCanvas.tsx @@ -34,6 +34,8 @@ import { ImageCompareNode, VideoStitchNode, EaseCurveNode, + WebhookTriggerNode, + WebhookResponseNode, } from "./nodes"; import { EditableEdge, ReferenceEdge } from "./edges"; import { ConnectionDropMenu, MenuAction } from "./ConnectionDropMenu"; @@ -66,6 +68,8 @@ const nodeTypes: NodeTypes = { imageCompare: ImageCompareNode, videoStitch: VideoStitchNode, easeCurve: EaseCurveNode, + webhookTrigger: WebhookTriggerNode, + webhookResponse: WebhookResponseNode, }; const edgeTypes: EdgeTypes = { @@ -125,6 +129,10 @@ const getNodeHandles = (nodeType: string): { inputs: string[]; outputs: string[] return { inputs: ["video", "audio"], outputs: ["video"] }; case "easeCurve": return { inputs: ["video", "easeCurve"], outputs: ["video", "easeCurve"] }; + case "webhookTrigger": + return { inputs: [], outputs: ["image", "text"] }; + case "webhookResponse": + return { inputs: ["image", "video", "text"], outputs: [] }; default: return { inputs: [], outputs: [] }; } @@ -315,7 +323,7 @@ export function WorkflowCanvas() { if (!targetNode) return false; const targetNodeType = targetNode.type; - if (targetNodeType === "generateVideo" || targetNodeType === "videoStitch" || targetNodeType === "easeCurve" || targetNodeType === "output") { + if (targetNodeType === "generateVideo" || targetNodeType === "videoStitch" || targetNodeType === "easeCurve" || targetNodeType === "output" || targetNodeType === "webhookResponse") { // For output node, we allow video even though its handle is typed as "image" // because output node can display both images and videos return true; @@ -1034,23 +1042,7 @@ export function WorkflowCanvas() { event.preventDefault(); const { centerX, centerY } = getViewportCenter(); // Offset by half the default node dimensions to center it - const defaultDimensions: Record = { - imageInput: { width: 300, height: 280 }, - audioInput: { width: 300, height: 200 }, - annotation: { width: 300, height: 280 }, - prompt: { width: 320, height: 220 }, - promptConstructor: { width: 340, height: 280 }, - nanoBanana: { width: 300, height: 300 }, - generateVideo: { width: 300, height: 300 }, - llmGenerate: { width: 320, height: 360 }, - splitGrid: { width: 300, height: 320 }, - output: { width: 320, height: 320 }, - outputGallery: { width: 320, height: 360 }, - imageCompare: { width: 400, height: 360 }, - videoStitch: { width: 400, height: 280 }, - easeCurve: { width: 340, height: 480 }, - }; - const dims = defaultDimensions[nodeType]; + const dims = defaultNodeDimensions[nodeType]; addNode(nodeType, { x: centerX - dims.width / 2, y: centerY - dims.height / 2 }); return; } @@ -1603,6 +1595,10 @@ export function WorkflowCanvas() { return "#f97316"; case "easeCurve": return "#bef264"; // lime-300 (easy-peasy-ease) + case "webhookTrigger": + return "#10b981"; // emerald-500 + case "webhookResponse": + return "#f43f5e"; // rose-500 default: return "#94a3b8"; } diff --git a/src/components/nodes/WebhookResponseNode.tsx b/src/components/nodes/WebhookResponseNode.tsx new file mode 100644 index 00000000..7e4c8958 --- /dev/null +++ b/src/components/nodes/WebhookResponseNode.tsx @@ -0,0 +1,108 @@ +"use client"; + +import { useMemo } from "react"; +import { Handle, Position, NodeProps, Node } from "@xyflow/react"; +import { BaseNode } from "./BaseNode"; +import { useCommentNavigation } from "@/hooks/useCommentNavigation"; +import { useWorkflowStore } from "@/store/workflowStore"; +import { WebhookResponseNodeData } from "@/types"; + +type WebhookResponseNodeType = Node; + +export function WebhookResponseNode({ id, data, selected }: NodeProps) { + const nodeData = data; + const commentNavigation = useCommentNavigation(id); + const updateNodeData = useWorkflowStore((state) => state.updateNodeData); + + const isVideo = useMemo(() => { + if (nodeData.video) return true; + if (nodeData.contentType === "video") return true; + if (nodeData.image?.startsWith("data:video/")) return true; + if (nodeData.image?.includes(".mp4") || nodeData.image?.includes(".webm")) return true; + return false; + }, [nodeData.video, nodeData.contentType, nodeData.image]); + + const contentSrc = useMemo(() => { + if (nodeData.video) return nodeData.video; + return nodeData.image; + }, [nodeData.video, nodeData.image]); + + const hasContent = contentSrc || nodeData.text; + + return ( + updateNodeData(id, { customTitle: title || undefined })} + onCommentChange={(comment) => updateNodeData(id, { comment: comment || undefined })} + selected={selected} + commentNavigation={commentNavigation ?? undefined} + > + {/* Image input handle */} + + + {/* Text input handle */} + + + {hasContent ? ( +
+ {contentSrc && ( +
+ {isVideo ? ( +
+ )} + {nodeData.text && ( +
+
Text output
+
{nodeData.text}
+
+ )} +
+ ) : ( +
+ + + + Waiting for webhook data +
+ )} + + {/* Status indicator */} +
+
+ + {hasContent ? "Data captured" : "No data yet"} + +
+ + ); +} diff --git a/src/components/nodes/WebhookTriggerNode.tsx b/src/components/nodes/WebhookTriggerNode.tsx new file mode 100644 index 00000000..9088b326 --- /dev/null +++ b/src/components/nodes/WebhookTriggerNode.tsx @@ -0,0 +1,169 @@ +"use client"; + +import { useCallback, useState } from "react"; +import { Handle, Position, NodeProps, Node } from "@xyflow/react"; +import { BaseNode } from "./BaseNode"; +import { useCommentNavigation } from "@/hooks/useCommentNavigation"; +import { useWorkflowStore } from "@/store/workflowStore"; +import { WebhookTriggerNodeData } from "@/types"; + +type WebhookTriggerNodeType = Node; + +export function WebhookTriggerNode({ id, data, selected }: NodeProps) { + const nodeData = data; + const commentNavigation = useCommentNavigation(id); + const updateNodeData = useWorkflowStore((state) => state.updateNodeData); + const workflowName = useWorkflowStore((state) => state.workflowName); + const [showSecret, setShowSecret] = useState(false); + + const handleImageCountChange = useCallback( + (e: React.ChangeEvent) => { + const count = parseInt(e.target.value, 10); + // Resize images array to match new count + const images = Array.from({ length: count }, (_, i) => nodeData.images[i] ?? null); + updateNodeData(id, { imageCount: count, images }); + }, + [id, nodeData.images, updateNodeData] + ); + + const handleTextToggle = useCallback(() => { + updateNodeData(id, { hasTextOutput: !nodeData.hasTextOutput }); + }, [id, nodeData.hasTextOutput, updateNodeData]); + + const handleSecretChange = useCallback( + (e: React.ChangeEvent) => { + updateNodeData(id, { webhookSecret: e.target.value || undefined }); + }, + [id, updateNodeData] + ); + + const webhookUrl = workflowName + ? `/api/webhook/${encodeURIComponent(workflowName.replace(/\.json$/, ""))}` + : "/api/webhook/"; + + const examplePayload = JSON.stringify( + { + images: Array.from({ length: Math.min(nodeData.imageCount, 2) }, () => "https://example.com/image.jpg"), + ...(nodeData.hasTextOutput ? { text: "your prompt here" } : {}), + }, + null, + 2 + ); + + return ( + updateNodeData(id, { customTitle: title || undefined })} + onCommentChange={(comment) => updateNodeData(id, { comment: comment || undefined })} + selected={selected} + commentNavigation={commentNavigation ?? undefined} + > +
+ {/* Webhook URL display */} +
+
Webhook URL
+
+ POST {webhookUrl} +
+
+ + {/* Image count selector */} +
+ + +
+ + {/* Text output toggle */} +
+ + +
+ + {/* Secret token */} +
+ +
+ + +
+
+ + {/* Example payload */} +
+ + Example payload + +
+            {examplePayload}
+          
+
+
+ + {/* Image output handles */} + {Array.from({ length: nodeData.imageCount }, (_, i) => ( + + ))} + + {/* Text output handle */} + {nodeData.hasTextOutput && ( + + )} +
+ ); +} diff --git a/src/components/nodes/index.ts b/src/components/nodes/index.ts index c161655d..51853152 100644 --- a/src/components/nodes/index.ts +++ b/src/components/nodes/index.ts @@ -12,4 +12,6 @@ export { OutputGalleryNode } from "./OutputGalleryNode"; export { ImageCompareNode } from "./ImageCompareNode"; export { VideoStitchNode } from "./VideoStitchNode"; export { EaseCurveNode } from "./EaseCurveNode"; +export { WebhookTriggerNode } from "./WebhookTriggerNode"; +export { WebhookResponseNode } from "./WebhookResponseNode"; export { GroupNode } from "./GroupNode"; diff --git a/src/lib/executor/graphUtils.ts b/src/lib/executor/graphUtils.ts new file mode 100644 index 00000000..97bf0104 --- /dev/null +++ b/src/lib/executor/graphUtils.ts @@ -0,0 +1,287 @@ +/** + * Graph Utilities + * + * Pure functions for workflow graph operations extracted from workflowStore. + * Used by both the client-side Zustand store and the server-side headless executor. + */ + +import type { + WorkflowNode, + WorkflowNodeData, + ImageInputNodeData, + AudioInputNodeData, + AnnotationNodeData, + NanoBananaNodeData, + GenerateVideoNodeData, + VideoStitchNodeData, + EaseCurveNodeData, + PromptNodeData, + PromptConstructorNodeData, + LLMGenerateNodeData, + WebhookTriggerNodeData, + ModelInputDef, +} from "@/types"; +import type { WorkflowEdge } from "@/types/workflow"; + +/** + * Level group for parallel execution + */ +export interface LevelGroup { + level: number; + nodeIds: string[]; +} + +/** + * Return type for getConnectedInputsPure + */ +export interface ConnectedInputs { + images: string[]; + videos: string[]; + audio: string[]; + text: string | null; + dynamicInputs: Record; + easeCurve: { bezierHandles: [number, number, number, number]; easingPreset: string | null } | null; +} + +/** + * Groups nodes by dependency level using Kahn's algorithm variant. + * Nodes at the same level can be executed in parallel. + * Level 0 = nodes with no incoming edges (roots) + * Level N = nodes whose dependencies are all at levels < N + */ +export function groupNodesByLevel( + nodes: WorkflowNode[], + edges: WorkflowEdge[] +): LevelGroup[] { + const inDegree = new Map(); + const adjList = new Map(); + + nodes.forEach((n) => { + inDegree.set(n.id, 0); + adjList.set(n.id, []); + }); + + edges.forEach((e) => { + inDegree.set(e.target, (inDegree.get(e.target) || 0) + 1); + adjList.get(e.source)?.push(e.target); + }); + + const levels: LevelGroup[] = []; + let currentLevel = nodes + .filter((n) => inDegree.get(n.id) === 0) + .map((n) => n.id); + + let levelNum = 0; + while (currentLevel.length > 0) { + levels.push({ level: levelNum, nodeIds: [...currentLevel] }); + + const nextLevel: string[] = []; + for (const nodeId of currentLevel) { + for (const child of adjList.get(nodeId) || []) { + const newDegree = (inDegree.get(child) || 1) - 1; + inDegree.set(child, newDegree); + if (newDegree === 0) { + nextLevel.push(child); + } + } + } + + currentLevel = nextLevel; + levelNum++; + } + + return levels; +} + +/** + * Chunk an array into smaller arrays of specified size + */ +export function chunk(array: T[], size: number): T[][] { + const chunks: T[][] = []; + for (let i = 0; i < array.length; i += size) { + chunks.push(array.slice(i, i + size)); + } + return chunks; +} + +/** + * Helper to determine if a handle ID is an image type + */ +function isImageHandle(handleId: string | null | undefined): boolean { + if (!handleId) return false; + return handleId === "image" || handleId.startsWith("image-") || handleId.includes("frame"); +} + +/** + * Helper to determine if a handle ID is a text type + */ +function isTextHandle(handleId: string | null | undefined): boolean { + if (!handleId) return false; + return handleId === "text" || handleId.startsWith("text-") || handleId.includes("prompt"); +} + +/** + * Extract output value from a source node based on its type. + */ +function getSourceOutput(sourceNode: WorkflowNode): { type: "image" | "text" | "video" | "audio"; value: string | null } { + if (sourceNode.type === "imageInput") { + return { type: "image", value: (sourceNode.data as ImageInputNodeData).image }; + } else if (sourceNode.type === "audioInput") { + return { type: "audio", value: (sourceNode.data as AudioInputNodeData).audioFile }; + } else if (sourceNode.type === "annotation") { + return { type: "image", value: (sourceNode.data as AnnotationNodeData).outputImage }; + } else if (sourceNode.type === "nanoBanana") { + return { type: "image", value: (sourceNode.data as NanoBananaNodeData).outputImage }; + } else if (sourceNode.type === "generateVideo") { + return { type: "video", value: (sourceNode.data as GenerateVideoNodeData).outputVideo }; + } else if (sourceNode.type === "videoStitch") { + return { type: "video", value: (sourceNode.data as VideoStitchNodeData).outputVideo }; + } else if (sourceNode.type === "easeCurve") { + return { type: "video", value: (sourceNode.data as EaseCurveNodeData).outputVideo }; + } else if (sourceNode.type === "prompt") { + return { type: "text", value: (sourceNode.data as PromptNodeData).prompt }; + } else if (sourceNode.type === "promptConstructor") { + const pcData = sourceNode.data as PromptConstructorNodeData; + return { type: "text", value: pcData.outputText || pcData.template || null }; + } else if (sourceNode.type === "llmGenerate") { + return { type: "text", value: (sourceNode.data as LLMGenerateNodeData).outputText }; + } else if (sourceNode.type === "webhookTrigger") { + // WebhookTrigger outputs are determined by the edge's sourceHandle + // We return null here; the actual routing is done in getConnectedInputsPure + // based on the sourceHandle ID + return { type: "image", value: null }; + } + return { type: "image", value: null }; +} + +/** + * Get the output value from a webhookTrigger node for a specific source handle. + */ +function getWebhookTriggerOutput( + sourceNode: WorkflowNode, + sourceHandle: string | null | undefined +): { type: "image" | "text"; value: string | null } { + const data = sourceNode.data as WebhookTriggerNodeData; + if (!sourceHandle) return { type: "image", value: null }; + + if (sourceHandle === "text") { + return { type: "text", value: data.text }; + } + + // image-0, image-1, etc. + const match = sourceHandle.match(/^image-(\d+)$/); + if (match) { + const index = parseInt(match[1], 10); + return { type: "image", value: data.images[index] ?? null }; + } + + return { type: "image", value: null }; +} + +/** + * Pure version of getConnectedInputs that works with explicit node/edge arrays. + * Used by both the Zustand store and the headless executor. + */ +export function getConnectedInputsPure( + nodeId: string, + nodes: WorkflowNode[], + edges: WorkflowEdge[] +): ConnectedInputs { + const images: string[] = []; + const videos: string[] = []; + const audio: string[] = []; + let text: string | null = null; + const dynamicInputs: Record = {}; + + // Get the target node to check for inputSchema + const targetNode = nodes.find((n) => n.id === nodeId); + const inputSchema = (targetNode?.data as { inputSchema?: ModelInputDef[] })?.inputSchema; + + // Build mapping from normalized handle IDs to schema names + const handleToSchemaName: Record = {}; + if (inputSchema && inputSchema.length > 0) { + const imageInputs = inputSchema.filter((i) => i.type === "image"); + const textInputs = inputSchema.filter((i) => i.type === "text"); + + imageInputs.forEach((input, index) => { + handleToSchemaName[`image-${index}`] = input.name; + if (index === 0) { + handleToSchemaName["image"] = input.name; + } + }); + + textInputs.forEach((input, index) => { + handleToSchemaName[`text-${index}`] = input.name; + if (index === 0) { + handleToSchemaName["text"] = input.name; + } + }); + } + + edges + .filter((edge) => edge.target === nodeId) + .forEach((edge) => { + const sourceNode = nodes.find((n) => n.id === edge.source); + if (!sourceNode) return; + + const handleId = edge.targetHandle; + + // Special handling for webhookTrigger source nodes + let type: "image" | "text" | "video" | "audio"; + let value: string | null; + + if (sourceNode.type === "webhookTrigger") { + const result = getWebhookTriggerOutput(sourceNode, edge.sourceHandle); + type = result.type; + value = result.value; + } else { + const result = getSourceOutput(sourceNode); + type = result.type; + value = result.value; + } + + if (!value) return; + + // Map normalized handle ID to schema name for dynamicInputs + if (handleId && handleToSchemaName[handleId]) { + const schemaName = handleToSchemaName[handleId]; + const existing = dynamicInputs[schemaName]; + if (existing !== undefined) { + dynamicInputs[schemaName] = Array.isArray(existing) + ? [...existing, value] + : [existing, value]; + } else { + dynamicInputs[schemaName] = value; + } + } + + // Route to typed arrays based on source output type + if (type === "video") { + videos.push(value); + } else if (type === "audio") { + audio.push(value); + } else if (type === "text" || isTextHandle(handleId)) { + text = value; + } else if (isImageHandle(handleId) || !handleId) { + images.push(value); + } + }); + + // Extract easeCurve data + let easeCurve: ConnectedInputs["easeCurve"] = null; + const easeCurveEdge = edges.find( + (e) => e.target === nodeId && e.targetHandle === "easeCurve" + ); + if (easeCurveEdge) { + const sourceNode = nodes.find((n) => n.id === easeCurveEdge.source); + if (sourceNode?.type === "easeCurve") { + const sourceData = sourceNode.data as EaseCurveNodeData; + easeCurve = { + bezierHandles: sourceData.bezierHandles, + easingPreset: sourceData.easingPreset, + }; + } + } + + return { images, videos, audio, text, dynamicInputs, easeCurve }; +} diff --git a/src/lib/executor/headlessExecutor.ts b/src/lib/executor/headlessExecutor.ts new file mode 100644 index 00000000..48798868 --- /dev/null +++ b/src/lib/executor/headlessExecutor.ts @@ -0,0 +1,497 @@ +/** + * Headless Workflow Executor + * + * Server-side execution engine that runs workflows without a browser. + * Reuses the same topological sort and data routing as the client, + * and calls the existing API routes (/api/generate, /api/llm) via fetch. + */ + +import type { + WorkflowNode, + WorkflowNodeData, + ImageInputNodeData, + AnnotationNodeData, + PromptNodeData, + PromptConstructorNodeData, + NanoBananaNodeData, + GenerateVideoNodeData, + LLMGenerateNodeData, + OutputNodeData, + WebhookTriggerNodeData, + WebhookResponseNodeData, +} from "@/types"; +import type { WorkflowEdge } from "@/types/workflow"; +import type { WebhookOutput } from "@/types/webhook"; +import { groupNodesByLevel, chunk, getConnectedInputsPure } from "./graphUtils"; + +const MAX_CONCURRENT = 3; + +/** + * Context for headless execution + */ +export interface ExecutionContext { + nodes: WorkflowNode[]; + edges: WorkflowEdge[]; + apiKeys: Record; + baseUrl: string; // e.g., "http://localhost:3000" + signal?: AbortSignal; +} + +/** + * In-memory node state manager for headless execution. + * Tracks data updates without needing Zustand. + */ +class NodeStateManager { + private overrides = new Map>(); + private nodes: WorkflowNode[]; + + constructor(nodes: WorkflowNode[]) { + this.nodes = nodes; + } + + update(nodeId: string, data: Record): void { + const existing = this.overrides.get(nodeId) || {}; + this.overrides.set(nodeId, { ...existing, ...data }); + } + + /** + * Get a node with its current data (original + overrides merged) + */ + getNode(nodeId: string): WorkflowNode | undefined { + const node = this.nodes.find((n) => n.id === nodeId); + if (!node) return undefined; + + const override = this.overrides.get(nodeId); + if (!override) return node; + + return { + ...node, + data: { ...node.data, ...override } as WorkflowNodeData, + }; + } + + /** + * Get all nodes with current data + */ + getAllNodes(): WorkflowNode[] { + return this.nodes.map((node) => { + const override = this.overrides.get(node.id); + if (!override) return node; + return { + ...node, + data: { ...node.data, ...override } as WorkflowNodeData, + }; + }); + } +} + +/** + * Execute a workflow headlessly on the server. + * Returns outputs captured from webhookResponse and output nodes. + */ +export async function executeWorkflowHeadless( + ctx: ExecutionContext +): Promise { + const state = new NodeStateManager(ctx.nodes); + const outputs: WebhookOutput[] = []; + + // Topological sort + const levels = groupNodesByLevel(ctx.nodes, ctx.edges); + + // Execute level by level + for (const level of levels) { + const nodeChunks = chunk(level.nodeIds, MAX_CONCURRENT); + + for (const nodeIds of nodeChunks) { + const results = await Promise.allSettled( + nodeIds.map((nodeId) => + executeSingleNode(nodeId, state, ctx, outputs) + ) + ); + + // Check for failures + for (const result of results) { + if (result.status === "rejected") { + throw new Error( + `Node execution failed: ${result.reason instanceof Error ? result.reason.message : String(result.reason)}` + ); + } + } + } + } + + return outputs; +} + +/** + * Execute a single node based on its type + */ +async function executeSingleNode( + nodeId: string, + state: NodeStateManager, + ctx: ExecutionContext, + outputs: WebhookOutput[] +): Promise { + const node = state.getNode(nodeId); + if (!node) return; + + if (ctx.signal?.aborted) { + throw new Error("Execution aborted"); + } + + switch (node.type) { + case "imageInput": + case "audioInput": + case "webhookTrigger": + // Data sources - already populated + break; + + case "annotation": { + // Pass through image (no canvas drawing in headless mode) + const { images } = getConnectedInputsPure(nodeId, state.getAllNodes(), ctx.edges); + if (images[0]) { + state.update(nodeId, { sourceImage: images[0], outputImage: images[0] }); + } + break; + } + + case "prompt": { + const { text } = getConnectedInputsPure(nodeId, state.getAllNodes(), ctx.edges); + if (text !== null) { + state.update(nodeId, { prompt: text }); + } + break; + } + + case "promptConstructor": { + const nodeData = node.data as PromptConstructorNodeData; + const template = nodeData.template; + + // Find connected prompt nodes + const connectedPromptNodes = ctx.edges + .filter((e) => e.target === nodeId && e.targetHandle === "text") + .map((e) => state.getNode(e.source)) + .filter((n): n is WorkflowNode => n !== undefined && n.type === "prompt"); + + const variableMap: Record = {}; + connectedPromptNodes.forEach((promptNode) => { + const promptData = promptNode.data as PromptNodeData; + if (promptData.variableName) { + variableMap[promptData.variableName] = promptData.prompt; + } + }); + + const varPattern = /@(\w+)/g; + const unresolvedVars: string[] = []; + let resolvedText = template; + + const matches = template.matchAll(varPattern); + for (const match of matches) { + const varName = match[1]; + if (variableMap[varName] !== undefined) { + resolvedText = resolvedText.replaceAll(`@${varName}`, variableMap[varName]); + } else if (!unresolvedVars.includes(varName)) { + unresolvedVars.push(varName); + } + } + + state.update(nodeId, { outputText: resolvedText, unresolvedVars }); + break; + } + + case "nanoBanana": { + const { images, text, dynamicInputs } = getConnectedInputsPure( + nodeId, state.getAllNodes(), ctx.edges + ); + + const promptFromDynamic = Array.isArray(dynamicInputs.prompt) + ? dynamicInputs.prompt[0] + : dynamicInputs.prompt; + const promptText = text || promptFromDynamic || null; + + if (!promptText) { + throw new Error(`nanoBanana node ${nodeId}: Missing text input`); + } + + const nodeData = node.data as NanoBananaNodeData; + + const headers = buildProviderHeaders(nodeData.selectedModel?.provider || "gemini", ctx.apiKeys); + + const response = await fetch(`${ctx.baseUrl}/api/generate`, { + method: "POST", + headers, + body: JSON.stringify({ + images, + prompt: promptText, + aspectRatio: nodeData.aspectRatio, + resolution: nodeData.resolution, + model: nodeData.model, + useGoogleSearch: nodeData.useGoogleSearch, + selectedModel: nodeData.selectedModel, + parameters: nodeData.parameters, + dynamicInputs, + }), + signal: ctx.signal, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`Generate API failed (${response.status}): ${errorText.substring(0, 200)}`); + } + + const result = await response.json(); + if (result.success && result.image) { + state.update(nodeId, { + outputImage: result.image, + ...(result.sourceUrl && { outputImageUrl: result.sourceUrl }), + status: "complete", + }); + } else { + throw new Error(`Generation failed: ${result.error || "Unknown error"}`); + } + break; + } + + case "generateVideo": { + const { images, text, dynamicInputs } = getConnectedInputsPure( + nodeId, state.getAllNodes(), ctx.edges + ); + const nodeData = node.data as GenerateVideoNodeData; + + if (!nodeData.selectedModel?.modelId) { + throw new Error(`generateVideo node ${nodeId}: No model selected`); + } + + const headers = buildProviderHeaders(nodeData.selectedModel.provider, ctx.apiKeys); + + const response = await fetch(`${ctx.baseUrl}/api/generate`, { + method: "POST", + headers, + body: JSON.stringify({ + images, + prompt: text, + selectedModel: nodeData.selectedModel, + parameters: nodeData.parameters, + dynamicInputs, + mediaType: "video", + }), + signal: ctx.signal, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`Generate Video API failed (${response.status}): ${errorText.substring(0, 200)}`); + } + + const result = await response.json(); + // /api/generate returns all media in result.image (even video). + // result.video/result.videoUrl may also exist for large videos returned as URL. + const videoData = result.video || result.videoUrl || result.image; + if (result.success && videoData) { + state.update(nodeId, { + outputVideo: videoData, + ...(result.sourceUrl && { outputVideoUrl: result.sourceUrl }), + status: "complete", + }); + } else { + throw new Error(`Video generation failed: ${result.error || "Unknown error"}`); + } + break; + } + + case "llmGenerate": { + const inputs = getConnectedInputsPure(nodeId, state.getAllNodes(), ctx.edges); + const llmData = node.data as LLMGenerateNodeData; + const promptText = inputs.text ?? llmData.inputPrompt; + + if (!promptText) { + throw new Error(`llmGenerate node ${nodeId}: Missing text input`); + } + + const headers = buildLLMHeaders(llmData.provider, ctx.apiKeys); + + const response = await fetch(`${ctx.baseUrl}/api/llm`, { + method: "POST", + headers, + body: JSON.stringify({ + prompt: promptText, + ...(inputs.images.length > 0 && { images: inputs.images }), + provider: llmData.provider, + model: llmData.model, + temperature: llmData.temperature, + maxTokens: llmData.maxTokens, + }), + signal: ctx.signal, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`LLM API failed (${response.status}): ${errorText.substring(0, 200)}`); + } + + const result = await response.json(); + if (result.success && result.text) { + state.update(nodeId, { outputText: result.text, status: "complete" }); + } else { + throw new Error(`LLM generation failed: ${result.error || "Unknown error"}`); + } + break; + } + + case "output": { + const { images, videos } = getConnectedInputsPure( + nodeId, state.getAllNodes(), ctx.edges + ); + if (videos.length > 0) { + state.update(nodeId, { image: videos[0], video: videos[0], contentType: "video" }); + outputs.push({ + nodeId, + type: "video", + base64: videos[0].startsWith("data:") ? videos[0] : undefined, + url: !videos[0].startsWith("data:") ? videos[0] : undefined, + }); + } else if (images.length > 0) { + state.update(nodeId, { image: images[0], contentType: "image" }); + outputs.push({ + nodeId, + type: "image", + base64: images[0], + }); + } + break; + } + + case "webhookResponse": { + const { images, videos, text } = getConnectedInputsPure( + nodeId, state.getAllNodes(), ctx.edges + ); + // Find upstream source URL from generate nodes connected to this response. + // Video connections arrive with targetHandle "image" (not "video"), so we + // check both outputImageUrl and outputVideoUrl on the source node. + const findUpstreamMediaUrl = (): string | undefined => { + const edge = ctx.edges.find( + (e) => e.target === nodeId && (e.targetHandle === "image" || e.targetHandle === "video" || !e.targetHandle) + ); + if (!edge) return undefined; + const sourceNode = state.getNode(edge.source); + const sourceData = sourceNode?.data as Record | undefined; + return (sourceData?.["outputImageUrl"] || sourceData?.["outputVideoUrl"]) as string | undefined; + }; + + const upstreamMediaUrl = findUpstreamMediaUrl(); + + if (videos.length > 0) { + state.update(nodeId, { image: videos[0], video: videos[0], contentType: "video" }); + const videoOutput = { + nodeId, + type: "video" as const, + base64: videos[0].startsWith("data:") ? videos[0] : undefined, + url: upstreamMediaUrl || (!videos[0].startsWith("data:") ? videos[0] : undefined), + }; + outputs.push(videoOutput); + } else if (images.length > 0) { + state.update(nodeId, { image: images[0], contentType: "image" }); + // Prefer the provider's original URL over base64 + outputs.push({ + nodeId, + type: "image", + base64: images[0].startsWith("data:") ? images[0] : undefined, + url: upstreamMediaUrl || (!images[0].startsWith("data:") ? images[0] : undefined), + }); + } + + if (text) { + state.update(nodeId, { text }); + // Only add text output if no image/video was captured + if (videos.length === 0 && images.length === 0) { + outputs.push({ + nodeId, + type: "text", + text, + }); + } + } + break; + } + + case "splitGrid": + case "outputGallery": + case "imageCompare": + case "videoStitch": + case "easeCurve": + // These nodes are not supported in headless mode + console.warn(`[HeadlessExecutor] Node type "${node.type}" is not supported in headless mode, skipping`); + break; + } +} + +/** + * Build headers with API key for generation providers + */ +function buildProviderHeaders( + provider: string, + apiKeys: Record +): Record { + const headers: Record = { + "Content-Type": "application/json", + }; + + switch (provider) { + case "gemini": + if (apiKeys.gemini) headers["X-Gemini-API-Key"] = apiKeys.gemini; + break; + case "replicate": + if (apiKeys.replicate) headers["X-Replicate-API-Key"] = apiKeys.replicate; + break; + case "fal": + if (apiKeys.fal) headers["X-Fal-API-Key"] = apiKeys.fal; + break; + case "kie": + if (apiKeys.kie) headers["X-Kie-Key"] = apiKeys.kie; + break; + case "wavespeed": + if (apiKeys.wavespeed) headers["X-WaveSpeed-Key"] = apiKeys.wavespeed; + break; + } + + return headers; +} + +/** + * Build headers with API key for LLM providers + */ +function buildLLMHeaders( + provider: string, + apiKeys: Record +): Record { + const headers: Record = { + "Content-Type": "application/json", + }; + + if (provider === "google" && apiKeys.gemini) { + headers["X-Gemini-API-Key"] = apiKeys.gemini; + } else if (provider === "openai" && apiKeys.openai) { + headers["X-OpenAI-API-Key"] = apiKeys.openai; + } + + return headers; +} + +/** + * Download an image from a URL and convert to base64 data URL. + * Returns null if download fails. + */ +export async function downloadImageToBase64(url: string): Promise { + // Already a data URL + if (url.startsWith("data:")) return url; + + const response = await fetch(url, { redirect: "follow" }); + if (!response.ok) { + throw new Error(`Failed to download image from ${url}: ${response.status} ${response.statusText}`); + } + + const contentType = response.headers.get("content-type") || "image/png"; + const buffer = await response.arrayBuffer(); + const base64 = Buffer.from(buffer).toString("base64"); + return `data:${contentType};base64,${base64}`; +} diff --git a/src/lib/executor/jobStore.ts b/src/lib/executor/jobStore.ts new file mode 100644 index 00000000..2e7a6329 --- /dev/null +++ b/src/lib/executor/jobStore.ts @@ -0,0 +1,74 @@ +/** + * Job Store + * + * In-memory store for tracking async webhook job execution. + * Jobs expire after 1 hour. + */ + +import type { WebhookJob, WebhookOutput } from "@/types/webhook"; + +const JOB_TTL_MS = 60 * 60 * 1000; // 1 hour + +const jobs = new Map(); + +/** + * Generate a unique job ID + */ +export function createJobId(): string { + return `job_${Date.now()}_${Math.random().toString(36).substring(2, 8)}`; +} + +/** + * Create a new job entry + */ +export function createJob(jobId: string, callbackUrl?: string): void { + cleanup(); + jobs.set(jobId, { + status: "processing", + createdAt: Date.now(), + callbackUrl, + }); +} + +/** + * Mark a job as completed with results + */ +export function completeJob(jobId: string, result: WebhookOutput[]): void { + const job = jobs.get(jobId); + if (job) { + job.status = "completed"; + job.result = result; + job.completedAt = Date.now(); + } +} + +/** + * Mark a job as failed with error + */ +export function failJob(jobId: string, error: string): void { + const job = jobs.get(jobId); + if (job) { + job.status = "failed"; + job.error = error; + job.completedAt = Date.now(); + } +} + +/** + * Get job status and results + */ +export function getJob(jobId: string): WebhookJob | undefined { + return jobs.get(jobId); +} + +/** + * Remove expired jobs + */ +function cleanup(): void { + const now = Date.now(); + for (const [id, job] of jobs) { + if (now - job.createdAt > JOB_TTL_MS) { + jobs.delete(id); + } + } +} diff --git a/src/lib/quickstart/validation.ts b/src/lib/quickstart/validation.ts index 5aa1acab..076c5b30 100644 --- a/src/lib/quickstart/validation.ts +++ b/src/lib/quickstart/validation.ts @@ -1,5 +1,6 @@ import { WorkflowFile } from "@/store/workflowStore"; import { NodeType, WorkflowNodeData } from "@/types"; +import { defaultNodeDimensions } from "@/store/utils/nodeDefaults"; interface ValidationError { path: string; @@ -24,23 +25,8 @@ const VALID_NODE_TYPES: NodeType[] = [ const VALID_HANDLE_TYPES = ["image", "text", "reference"]; -// Default node dimensions -const DEFAULT_DIMENSIONS: Record = { - imageInput: { width: 300, height: 280 }, - audioInput: { width: 300, height: 200 }, - annotation: { width: 300, height: 280 }, - prompt: { width: 320, height: 220 }, - promptConstructor: { width: 340, height: 280 }, - nanoBanana: { width: 300, height: 300 }, - generateVideo: { width: 300, height: 300 }, - llmGenerate: { width: 320, height: 360 }, - splitGrid: { width: 300, height: 320 }, - output: { width: 320, height: 320 }, - outputGallery: { width: 320, height: 360 }, - imageCompare: { width: 400, height: 360 }, - videoStitch: { width: 400, height: 280 }, - easeCurve: { width: 340, height: 480 }, -}; +// Default node dimensions - use centralized source +const DEFAULT_DIMENSIONS = defaultNodeDimensions; /** * Validate a workflow JSON object @@ -314,6 +300,19 @@ function createDefaultNodeData(type: NodeType): WorkflowNodeData { progress: 0, encoderSupported: null, }; + case "webhookTrigger": + return { + imageCount: 1, + hasTextOutput: true, + images: [null], + text: null, + }; + case "webhookResponse": + return { + image: null, + video: null, + text: null, + }; } } diff --git a/src/store/utils/nodeDefaults.ts b/src/store/utils/nodeDefaults.ts index 96133b82..09dd05f5 100644 --- a/src/store/utils/nodeDefaults.ts +++ b/src/store/utils/nodeDefaults.ts @@ -13,6 +13,8 @@ import { OutputGalleryNodeData, ImageCompareNodeData, EaseCurveNodeData, + WebhookTriggerNodeData, + WebhookResponseNodeData, WorkflowNodeData, GroupColor, SelectedModel, @@ -38,6 +40,8 @@ export const defaultNodeDimensions: Record { progress: 0, encoderSupported: null, } as EaseCurveNodeData; + case "webhookTrigger": + return { + imageCount: 1, + hasTextOutput: true, + images: [null], + text: null, + } as WebhookTriggerNodeData; + case "webhookResponse": + return { + image: null, + video: null, + text: null, + } as WebhookResponseNodeData; } }; diff --git a/src/store/workflowStore.ts b/src/store/workflowStore.ts index f7acc5b0..7bf3ce41 100644 --- a/src/store/workflowStore.ts +++ b/src/store/workflowStore.ts @@ -33,12 +33,19 @@ import { OutputGalleryNodeData, VideoStitchNodeData, EaseCurveNodeData, + WebhookTriggerNodeData, + WebhookResponseNodeData, } from "@/types"; import { useToast } from "@/components/Toast"; import { calculateGenerationCost } from "@/utils/costCalculator"; import { logger } from "@/utils/logger"; import { externalizeWorkflowImages, hydrateWorkflowImages } from "@/utils/imageStorage"; import { EditOperation, applyEditOperations as executeEditOps } from "@/lib/chat/editOperations"; +import { + groupNodesByLevel as groupNodesByLevelPure, + chunk as chunkPure, + getConnectedInputsPure, +} from "@/lib/executor/graphUtils"; import { loadSaveConfigs, saveSaveConfig, @@ -344,74 +351,12 @@ const saveConcurrencySetting = (value: number): void => { localStorage.setItem(CONCURRENCY_SETTINGS_KEY, String(value)); }; -// Level grouping for parallel execution -export interface LevelGroup { - level: number; - nodeIds: string[]; -} - -/** - * Groups nodes by dependency level using Kahn's algorithm variant. - * Nodes at the same level can be executed in parallel. - * Level 0 = nodes with no incoming edges (roots) - * Level N = nodes whose dependencies are all at levels < N - */ -function groupNodesByLevel( - nodes: WorkflowNode[], - edges: WorkflowEdge[] -): LevelGroup[] { - // Calculate in-degree for each node - const inDegree = new Map(); - const adjList = new Map(); - - nodes.forEach((n) => { - inDegree.set(n.id, 0); - adjList.set(n.id, []); - }); +// Re-export from graphUtils for backward compatibility +export type { LevelGroup } from "@/lib/executor/graphUtils"; - edges.forEach((e) => { - inDegree.set(e.target, (inDegree.get(e.target) || 0) + 1); - adjList.get(e.source)?.push(e.target); - }); - - // BFS with level tracking (Kahn's algorithm variant) - const levels: LevelGroup[] = []; - let currentLevel = nodes - .filter((n) => inDegree.get(n.id) === 0) - .map((n) => n.id); - - let levelNum = 0; - while (currentLevel.length > 0) { - levels.push({ level: levelNum, nodeIds: [...currentLevel] }); - - const nextLevel: string[] = []; - for (const nodeId of currentLevel) { - for (const child of adjList.get(nodeId) || []) { - const newDegree = (inDegree.get(child) || 1) - 1; - inDegree.set(child, newDegree); - if (newDegree === 0) { - nextLevel.push(child); - } - } - } - - currentLevel = nextLevel; - levelNum++; - } - - return levels; -} - -/** - * Chunk an array into smaller arrays of specified size - */ -function chunk(array: T[], size: number): T[][] { - const chunks: T[][] = []; - for (let i = 0; i < array.length; i += size) { - chunks.push(array.slice(i, i + size)); - } - return chunks; -} +// Use pure functions from graphUtils +const groupNodesByLevel = groupNodesByLevelPure; +const chunk = chunkPure; // Clear all imageRefs from nodes (used when saving to a different directory) /** Revoke a blob URL if the value is one, to free the underlying memory. */ @@ -953,6 +898,21 @@ export const useWorkflowStore = create((set, get) => ({ return { type: "image", value: null }; }; + // Helper to get webhook trigger output based on source handle + const getWebhookTriggerOutput = (sourceNode: WorkflowNode, sourceHandle: string | null | undefined): { type: "image" | "text"; value: string | null } => { + const data = sourceNode.data as WebhookTriggerNodeData; + if (!sourceHandle) return { type: "image", value: null }; + if (sourceHandle === "text") { + return { type: "text", value: data.text }; + } + const match = sourceHandle.match(/^image-(\d+)$/); + if (match) { + const index = parseInt(match[1], 10); + return { type: "image", value: data.images[index] ?? null }; + } + return { type: "image", value: null }; + }; + edges .filter((edge) => edge.target === nodeId) .forEach((edge) => { @@ -960,7 +920,19 @@ export const useWorkflowStore = create((set, get) => ({ if (!sourceNode) return; const handleId = edge.targetHandle; - const { type, value } = getSourceOutput(sourceNode); + + // Special handling for webhookTrigger - output depends on sourceHandle + let type: "image" | "text" | "video" | "audio"; + let value: string | null; + if (sourceNode.type === "webhookTrigger") { + const result = getWebhookTriggerOutput(sourceNode, edge.sourceHandle); + type = result.type; + value = result.value; + } else { + const result = getSourceOutput(sourceNode); + type = result.type; + value = result.value; + } if (!value) return; @@ -1161,6 +1133,33 @@ export const useWorkflowStore = create((set, get) => ({ // Audio input is a data source - no execution needed break; + case "webhookTrigger": + // Data is injected by webhook API - nothing to execute in browser + break; + + case "webhookResponse": { + // Capture connected inputs for display (acts like output node in browser) + const { images, videos, text: responseText } = getConnectedInputs(node.id); + if (videos.length > 0) { + updateNodeData(node.id, { + image: videos[0], + video: videos[0], + contentType: "video", + } as Partial); + } else if (images.length > 0) { + updateNodeData(node.id, { + image: images[0], + contentType: "image", + } as Partial); + } + if (responseText) { + updateNodeData(node.id, { + text: responseText, + } as Partial); + } + break; + } + case "annotation": { try { // Get connected image and set as source (use first image) diff --git a/src/types/api.ts b/src/types/api.ts index b0f9f622..a84d93be 100644 --- a/src/types/api.ts +++ b/src/types/api.ts @@ -24,6 +24,7 @@ export interface GenerateResponse { image?: string; video?: string; videoUrl?: string; // For large videos, return URL directly + sourceUrl?: string; // Original provider URL before base64 conversion contentType?: "image" | "video"; error?: string; } diff --git a/src/types/index.ts b/src/types/index.ts index f0909b6e..0be8d362 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -14,3 +14,4 @@ export * from "./workflow"; export * from "./api"; export * from "./quickstart"; export * from "./chat"; +export * from "./webhook"; diff --git a/src/types/nodes.ts b/src/types/nodes.ts index 9cf5b111..1558c429 100644 --- a/src/types/nodes.ts +++ b/src/types/nodes.ts @@ -36,7 +36,9 @@ export type NodeType = | "outputGallery" | "imageCompare" | "videoStitch" - | "easeCurve"; + | "easeCurve" + | "webhookTrigger" + | "webhookResponse"; /** * Node execution status @@ -280,6 +282,27 @@ export interface SplitGridNodeData extends BaseNodeData { error: string | null; } +/** + * Webhook Trigger node - receives external data via webhook API + */ +export interface WebhookTriggerNodeData extends BaseNodeData { + imageCount: number; // 1-10, number of image outputs + hasTextOutput: boolean; // whether to include text output handle + images: (string | null)[]; // Injected base64 data URLs during webhook execution + text: string | null; // Injected text during webhook execution + webhookSecret?: string; // Optional auth token +} + +/** + * Webhook Response node - captures output to return via webhook API + */ +export interface WebhookResponseNodeData extends BaseNodeData { + image: string | null; + video?: string | null; + text?: string | null; + contentType?: "image" | "video" | "text"; +} + /** * Union of all node data types */ @@ -297,7 +320,9 @@ export type WorkflowNodeData = | OutputGalleryNodeData | ImageCompareNodeData | VideoStitchNodeData - | EaseCurveNodeData; + | EaseCurveNodeData + | WebhookTriggerNodeData + | WebhookResponseNodeData; /** * Workflow node with typed data (extended with optional groupId) diff --git a/src/types/webhook.ts b/src/types/webhook.ts new file mode 100644 index 00000000..b441db65 --- /dev/null +++ b/src/types/webhook.ts @@ -0,0 +1,71 @@ +/** + * Webhook Types + * + * Types for webhook trigger/response system including + * API request/response formats and job tracking. + */ + +/** + * Request body for the webhook endpoint + */ +export interface WebhookRequest { + images?: (string | null)[]; // URLs or base64 data URLs, max 10 + text?: string; // Prompt text + callbackUrl?: string; // For async mode + recordId?: string; // Pass-through ID (e.g. Airtable record ID) + apiKeys?: { + gemini?: string; + replicate?: string; + fal?: string; + kie?: string; + wavespeed?: string; + openai?: string; + }; +} + +/** + * Single output from a webhook response node + */ +export interface WebhookOutput { + nodeId: string; + type: "image" | "video" | "text"; + base64?: string; + url?: string; + text?: string; +} + +/** + * Response from the webhook endpoint (sync mode) + */ +export interface WebhookSyncResponse { + success: boolean; + outputs?: WebhookOutput[]; + error?: string; + executionTime?: number; // ms +} + +/** + * Response from the webhook endpoint (async mode) + */ +export interface WebhookAsyncResponse { + success: boolean; + jobId: string; + status: "processing"; +} + +/** + * Job status for async webhook execution + */ +export type WebhookJobStatus = "processing" | "completed" | "failed"; + +/** + * Stored job data for async tracking + */ +export interface WebhookJob { + status: WebhookJobStatus; + result?: WebhookOutput[]; + error?: string; + createdAt: number; + completedAt?: number; + callbackUrl?: string; +}