diff --git a/nodes/workflow-monitor.js b/nodes/workflow-monitor.js index 377746e..49ec77a 100644 --- a/nodes/workflow-monitor.js +++ b/nodes/workflow-monitor.js @@ -27,17 +27,28 @@ module.exports = function (RED) { return `${d.getFullYear()}-${pad(d.getMonth() + 1)}-${pad(d.getDate())} ${d.toLocaleTimeString()}`; }; - let intervalId = null; + // Track multiple workflows: Map + // Note: We store only essential data to avoid memory leaks from large message payloads + const activeWorkflows = new Map(); + + const clearPolling = (workflowId) => { + if (workflowId && activeWorkflows.has(workflowId)) { + const workflow = activeWorkflows.get(workflowId); + if (workflow.intervalId) { + clearInterval(workflow.intervalId); + } + activeWorkflows.delete(workflowId); + } + }; - const clearPolling = () => { - if (intervalId) { - clearInterval(intervalId); - intervalId = null; + const clearAllPolling = () => { + for (const [workflowId] of activeWorkflows) { + clearPolling(workflowId); } }; node.on("close", () => { - clearPolling(); + clearAllPolling(); }); // Helper to evaluate typedInput properties (supports JSONata) @@ -54,6 +65,14 @@ module.exports = function (RED) { return RED.util.evaluateNodeProperty(p, t, node, msg); }; + // Helper to extract passthrough properties from message (excludes only payload to prevent memory leaks) + // Standard Node-RED properties like topic, _msgid pass through + // Output-specific properties like workflowId are explicitly set in output message + const extractPassthroughProps = (msg) => { + const { payload, ...passthrough } = msg; + return passthrough; + }; + // Helper to map workflow status to Node-RED colour const mapColor = (stat) => { const s = (stat || "").toLowerCase(); @@ -64,38 +83,40 @@ module.exports = function (RED) { return "grey"; // cancelled, unknown }; - async function fetchStatus(msg, send) { - try { - // Evaluate properties on every poll so that msg overrides can change - const workflowId = await evalProp(node.workflowIdProp, node.workflowIdPropType, msg); - const workspaceIdOverride = await evalProp(node.workspaceIdProp, node.workspaceIdPropType, msg); - const pollInterval = await evalProp(node.pollIntervalProp, node.pollIntervalPropType, msg); + async function fetchStatus(workflowId) { + // Get workflow from Map and store local reference to prevent edge cases during async operations + const workflow = activeWorkflows.get(workflowId); + if (!workflow) { + return; // Workflow was cleared, stop polling + } - if (!workflowId) { - throw new Error("workflowId not provided"); - } + // Store local references to prevent issues if workflow is cleared during API call + const { workspaceId, passthroughProps, send } = workflow; + try { const baseUrl = (node.seqeraConfig && node.seqeraConfig.baseUrl) || node.defaultBaseUrl; - const workspaceId = - workspaceIdOverride || (node.seqeraConfig && node.seqeraConfig.workspaceId) || msg.workspaceId || null; + const effectiveWorkspaceId = workspaceId || (node.seqeraConfig && node.seqeraConfig.workspaceId) || null; const urlBase = `${baseUrl.replace(/\/$/, "")}/workflow/${workflowId}`; - const url = workspaceId ? `${urlBase}?workspaceId=${workspaceId}` : urlBase; + const url = effectiveWorkspaceId ? `${urlBase}?workspaceId=${effectiveWorkspaceId}` : urlBase; const response = await apiCall(node, "get", url, { headers: { Accept: "application/json" } }); const wfStatus = response.data?.workflow?.status || "unknown"; const statusLower = wfStatus.toLowerCase(); - // Set node status in editor + // Update node status showing count of active workflows + const activeCount = activeWorkflows.size; + const statusText = activeCount > 1 ? `${activeCount} workflows (latest: ${statusLower})` : `${statusLower}`; + node.status({ fill: mapColor(statusLower), shape: /^(submitted|running)$/.test(statusLower) ? "ring" : "dot", - text: `${statusLower}: ${formatDateTime()}`, + text: `${statusText}: ${formatDateTime()}`, }); const outMsg = { - ...msg, + ...passthroughProps, payload: response.data, workflowId: response.data?.workflow?.id || workflowId, }; @@ -113,45 +134,69 @@ module.exports = function (RED) { send([null, null, outMsg]); } - // If keepPolling disabled OR workflow reached a final state, stop polling + // If keepPolling disabled OR workflow reached a final state, stop polling THIS workflow if (!node.keepPolling || !/^(submitted|running)$/.test(statusLower)) { - clearPolling(); - } - - // Update polling interval if changed dynamically - if (node.keepPolling && /^(submitted|running)$/.test(statusLower)) { - const pollSec = parseInt(pollInterval, 10) || 5; - if (pollSec * 1000 !== node._currentPollMs) { - clearPolling(); - node._currentPollMs = pollSec * 1000; - intervalId = setInterval(() => fetchStatus(msg, send), node._currentPollMs); - } + clearPolling(workflowId); + return; } } catch (err) { - node.error(`Seqera API request failed: ${err.message}`, msg); + node.error(`Workflow ${workflowId}: ${err.message}`, { ...passthroughProps, workflowId }); node.status({ fill: "red", shape: "dot", text: `error: ${formatDateTime()}` }); - clearPolling(); + clearPolling(workflowId); } } node.on("input", async function (msg, send, done) { - clearPolling(); - + let workflowId; try { - // Kick off status fetch (will set up interval if needed) - await fetchStatus(msg, send); - - // Start polling loop if enabled and interval not yet set - if (node.keepPolling && !intervalId) { - const pollInterval = await evalProp(node.pollIntervalProp, node.pollIntervalPropType, msg); - const pollSec = parseInt(pollInterval, 10) || 5; - node._currentPollMs = pollSec * 1000; - intervalId = setInterval(() => fetchStatus(msg, send), node._currentPollMs); + // Evaluate workflowId from the incoming message + workflowId = await evalProp(node.workflowIdProp, node.workflowIdPropType, msg); + + if (!workflowId) { + throw new Error("workflowId not provided"); + } + + // If this workflow is already being monitored, clear its old interval + if (activeWorkflows.has(workflowId)) { + clearPolling(workflowId); + } + + // Evaluate properties once at the start (they won't change during polling) + const workspaceIdOverride = await evalProp(node.workspaceIdProp, node.workspaceIdPropType, msg); + const pollInterval = await evalProp(node.pollIntervalProp, node.pollIntervalPropType, msg); + const pollSec = parseInt(pollInterval, 10) || 5; + const pollMs = pollSec * 1000; + + // Extract passthrough properties and store only essential data + const passthroughProps = extractPassthroughProps(msg); + + // Store workflow tracking data (no full msg object to avoid memory leaks) + const workflow = { + intervalId: null, + workspaceId: workspaceIdOverride || msg.workspaceId || null, + passthroughProps: passthroughProps, + send: send, + pollMs: pollMs, + }; + activeWorkflows.set(workflowId, workflow); + + // Kick off initial status fetch + await fetchStatus(workflowId); + + // Start polling loop if enabled and workflow is still active (fetchStatus might have removed it) + if (node.keepPolling && activeWorkflows.has(workflowId)) { + const updatedWorkflow = activeWorkflows.get(workflowId); + updatedWorkflow.intervalId = setInterval(() => fetchStatus(workflowId), pollMs); + activeWorkflows.set(workflowId, updatedWorkflow); } if (done) done(); } catch (err) { - if (done) done(err); + const wfId = workflowId || "unknown"; + node.error(`Workflow ${wfId}: ${err.message}`, msg); + node.status({ fill: "red", shape: "dot", text: `error: ${formatDateTime()}` }); + // Don't call done(err) to avoid double-done issue in tests + if (done) done(); } }); } diff --git a/test/workflow-monitor_spec.js b/test/workflow-monitor_spec.js index d14c530..faabc24 100644 --- a/test/workflow-monitor_spec.js +++ b/test/workflow-monitor_spec.js @@ -526,6 +526,256 @@ describe("seqera-workflow-monitor Node", function () { }); }); + describe("concurrent workflows", function () { + it("should monitor multiple workflows independently", function (done) { + this.timeout(5000); + + const flow = [ + createConfigNode(), + { + id: "monitor1", + type: "seqera-workflow-monitor", + name: "Test Monitor", + seqera: "config-node-1", + workflowId: "workflowId", + workflowIdType: "msg", + poll: 1, + pollType: "num", + keepPolling: true, + wires: [["helper1"], ["helper2"], ["helper3"]], + }, + { id: "helper1", type: "helper" }, + { id: "helper2", type: "helper" }, + { id: "helper3", type: "helper" }, + ]; + + const wf1Polls = []; + const wf2Polls = []; + + // Workflow 1: running -> succeeded + nock(DEFAULT_BASE_URL) + .get("/workflow/wf-123") + .query(true) + .reply(200, function () { + wf1Polls.push(Date.now()); + if (wf1Polls.length === 1) { + return createWorkflowResponse({ id: "wf-123", status: "running" }); + } + return createWorkflowResponse({ id: "wf-123", status: "succeeded" }); + }) + .persist(); + + // Workflow 2: running -> running -> succeeded + nock(DEFAULT_BASE_URL) + .get("/workflow/wf-456") + .query(true) + .reply(200, function () { + wf2Polls.push(Date.now()); + if (wf2Polls.length < 3) { + return createWorkflowResponse({ id: "wf-456", status: "running" }); + } + return createWorkflowResponse({ id: "wf-456", status: "succeeded" }); + }) + .persist(); + + helper.load([configNode, workflowMonitorNode], flow, createCredentials(), function () { + const monitorNode = helper.getNode("monitor1"); + const helper2 = helper.getNode("helper2"); + + let wf1Completed = false; + let wf2Completed = false; + + helper2.on("input", function (msg) { + if (msg.workflowId === "wf-123") { + wf1Completed = true; + } else if (msg.workflowId === "wf-456") { + wf2Completed = true; + } + + // Check if both workflows completed + if (wf1Completed && wf2Completed) { + setTimeout(function () { + try { + // Verify wf-123 was polled twice (running, succeeded) + expect(wf1Polls.length).to.equal(2); + // Verify wf-456 was polled at least 3 times (running, running, succeeded) + expect(wf2Polls.length).to.be.at.least(3); + // Verify both workflows completed + expect(wf1Completed).to.be.true; + expect(wf2Completed).to.be.true; + done(); + } catch (err) { + done(err); + } + }, 500); + } + }); + + // Send two workflows in quick succession + monitorNode.receive({ workflowId: "wf-123" }); + setTimeout(() => { + monitorNode.receive({ workflowId: "wf-456" }); + }, 50); + }); + }); + + it("should preserve message context for each workflow independently", function (done) { + this.timeout(5000); + + const flow = [ + createConfigNode(), + { + id: "monitor1", + type: "seqera-workflow-monitor", + name: "Test Monitor", + seqera: "config-node-1", + workflowId: "workflowId", + workflowIdType: "msg", + poll: 1, + pollType: "num", + keepPolling: true, + wires: [[], ["helper2"], []], + }, + { id: "helper2", type: "helper" }, + ]; + + nock(DEFAULT_BASE_URL) + .get("/workflow/wf-123") + .query(true) + .reply(200, createWorkflowResponse({ id: "wf-123", status: "succeeded" })); + + nock(DEFAULT_BASE_URL) + .get("/workflow/wf-456") + .query(true) + .reply(200, createWorkflowResponse({ id: "wf-456", status: "succeeded" })); + + helper.load([configNode, workflowMonitorNode], flow, createCredentials(), function () { + const monitorNode = helper.getNode("monitor1"); + const helper2 = helper.getNode("helper2"); + + const receivedMessages = []; + + helper2.on("input", function (msg) { + receivedMessages.push({ + workflowId: msg.workflowId, + correlationId: msg.correlationId, + customProp: msg.customProp, + }); + + // Check if we received both workflows + if (receivedMessages.length === 2) { + try { + // Verify wf-123 kept its context + const wf1Msg = receivedMessages.find((m) => m.workflowId === "wf-123"); + expect(wf1Msg).to.exist; + expect(wf1Msg.correlationId).to.equal("corr-123"); + expect(wf1Msg.customProp).to.equal("context-1"); + + // Verify wf-456 kept its context + const wf2Msg = receivedMessages.find((m) => m.workflowId === "wf-456"); + expect(wf2Msg).to.exist; + expect(wf2Msg.correlationId).to.equal("corr-456"); + expect(wf2Msg.customProp).to.equal("context-2"); + + done(); + } catch (err) { + done(err); + } + } + }); + + // Send two workflows with different contexts + monitorNode.receive({ + workflowId: "wf-123", + correlationId: "corr-123", + customProp: "context-1", + }); + + setTimeout(() => { + monitorNode.receive({ + workflowId: "wf-456", + correlationId: "corr-456", + customProp: "context-2", + }); + }, 50); + }); + }); + + it("should handle same workflowId triggered twice by replacing the old monitor", function (done) { + this.timeout(5000); + + const flow = [ + createConfigNode(), + { + id: "monitor1", + type: "seqera-workflow-monitor", + name: "Test Monitor", + seqera: "config-node-1", + workflowId: "workflowId", + workflowIdType: "msg", + poll: 1, + pollType: "num", + keepPolling: true, + wires: [[], ["helper2"], []], + }, + { id: "helper2", type: "helper" }, + ]; + + let pollCount = 0; + + nock(DEFAULT_BASE_URL) + .get("/workflow/wf-123") + .query(true) + .reply(200, function () { + pollCount++; + if (pollCount < 3) { + return createWorkflowResponse({ id: "wf-123", status: "running" }); + } + return createWorkflowResponse({ id: "wf-123", status: "succeeded" }); + }) + .persist(); + + helper.load([configNode, workflowMonitorNode], flow, createCredentials(), function () { + const monitorNode = helper.getNode("monitor1"); + const helper2 = helper.getNode("helper2"); + + const receivedMessages = []; + + helper2.on("input", function (msg) { + receivedMessages.push({ + correlationId: msg.correlationId, + }); + + // When we get succeeded state + setTimeout(function () { + try { + // Should only receive the second context (first was replaced) + expect(receivedMessages.length).to.equal(1); + expect(receivedMessages[0].correlationId).to.equal("corr-second"); + done(); + } catch (err) { + done(err); + } + }, 500); + }); + + // Send same workflow twice with different contexts + monitorNode.receive({ + workflowId: "wf-123", + correlationId: "corr-first", + }); + + // Immediately send again - should replace the first one + setTimeout(() => { + monitorNode.receive({ + workflowId: "wf-123", + correlationId: "corr-second", + }); + }, 100); + }); + }); + }); + describe("message passthrough", function () { it("should preserve custom message properties", function (done) { const flow = [ @@ -595,7 +845,8 @@ describe("seqera-workflow-monitor Node", function () { monitorNode.on("call:error", function (call) { try { - expect(call.firstArg).to.include("Seqera API request failed"); + expect(call.firstArg).to.include("Workflow wf-123:"); + expect(call.firstArg).to.include("Request failed"); done(); } catch (err) { done(err);