Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 92 additions & 47 deletions nodes/workflow-monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,28 @@ module.exports = function (RED) {
return `${d.getFullYear()}-${pad(d.getMonth() + 1)}-${pad(d.getDate())} ${d.toLocaleTimeString()}`;
};

let intervalId = null;
// Track multiple workflows: Map<workflowId, {intervalId, workspaceId, passthroughProps, send, pollMs}>
// Note: We store only essential data to avoid memory leaks from large message payloads
const activeWorkflows = new Map();

const clearPolling = (workflowId) => {
if (workflowId && activeWorkflows.has(workflowId)) {
const workflow = activeWorkflows.get(workflowId);
if (workflow.intervalId) {
clearInterval(workflow.intervalId);
}
activeWorkflows.delete(workflowId);
}
};

const clearPolling = () => {
if (intervalId) {
clearInterval(intervalId);
intervalId = null;
const clearAllPolling = () => {
for (const [workflowId] of activeWorkflows) {
clearPolling(workflowId);
}
};

node.on("close", () => {
clearPolling();
clearAllPolling();
});

// Helper to evaluate typedInput properties (supports JSONata)
Expand All @@ -54,6 +65,14 @@ module.exports = function (RED) {
return RED.util.evaluateNodeProperty(p, t, node, msg);
};

// Helper to extract passthrough properties from message (excludes only payload to prevent memory leaks)
// Standard Node-RED properties like topic, _msgid pass through
// Output-specific properties like workflowId are explicitly set in output message
const extractPassthroughProps = (msg) => {
const { payload, ...passthrough } = msg;
return passthrough;
};

// Helper to map workflow status to Node-RED colour
const mapColor = (stat) => {
const s = (stat || "").toLowerCase();
Expand All @@ -64,38 +83,40 @@ module.exports = function (RED) {
return "grey"; // cancelled, unknown
};

async function fetchStatus(msg, send) {
try {
// Evaluate properties on every poll so that msg overrides can change
const workflowId = await evalProp(node.workflowIdProp, node.workflowIdPropType, msg);
const workspaceIdOverride = await evalProp(node.workspaceIdProp, node.workspaceIdPropType, msg);
const pollInterval = await evalProp(node.pollIntervalProp, node.pollIntervalPropType, msg);
async function fetchStatus(workflowId) {
// Get workflow from Map and store local reference to prevent edge cases during async operations
const workflow = activeWorkflows.get(workflowId);
if (!workflow) {
return; // Workflow was cleared, stop polling
}

if (!workflowId) {
throw new Error("workflowId not provided");
}
// Store local references to prevent issues if workflow is cleared during API call
const { workspaceId, passthroughProps, send } = workflow;

try {
const baseUrl = (node.seqeraConfig && node.seqeraConfig.baseUrl) || node.defaultBaseUrl;
const workspaceId =
workspaceIdOverride || (node.seqeraConfig && node.seqeraConfig.workspaceId) || msg.workspaceId || null;
const effectiveWorkspaceId = workspaceId || (node.seqeraConfig && node.seqeraConfig.workspaceId) || null;

const urlBase = `${baseUrl.replace(/\/$/, "")}/workflow/${workflowId}`;
const url = workspaceId ? `${urlBase}?workspaceId=${workspaceId}` : urlBase;
const url = effectiveWorkspaceId ? `${urlBase}?workspaceId=${effectiveWorkspaceId}` : urlBase;

const response = await apiCall(node, "get", url, { headers: { Accept: "application/json" } });

const wfStatus = response.data?.workflow?.status || "unknown";
const statusLower = wfStatus.toLowerCase();

// Set node status in editor
// Update node status showing count of active workflows
const activeCount = activeWorkflows.size;
const statusText = activeCount > 1 ? `${activeCount} workflows (latest: ${statusLower})` : `${statusLower}`;

node.status({
fill: mapColor(statusLower),
shape: /^(submitted|running)$/.test(statusLower) ? "ring" : "dot",
text: `${statusLower}: ${formatDateTime()}`,
text: `${statusText}: ${formatDateTime()}`,
});

const outMsg = {
...msg,
...passthroughProps,
payload: response.data,
workflowId: response.data?.workflow?.id || workflowId,
};
Expand All @@ -113,45 +134,69 @@ module.exports = function (RED) {
send([null, null, outMsg]);
}

// If keepPolling disabled OR workflow reached a final state, stop polling
// If keepPolling disabled OR workflow reached a final state, stop polling THIS workflow
if (!node.keepPolling || !/^(submitted|running)$/.test(statusLower)) {
clearPolling();
}

// Update polling interval if changed dynamically
if (node.keepPolling && /^(submitted|running)$/.test(statusLower)) {
const pollSec = parseInt(pollInterval, 10) || 5;
if (pollSec * 1000 !== node._currentPollMs) {
clearPolling();
node._currentPollMs = pollSec * 1000;
intervalId = setInterval(() => fetchStatus(msg, send), node._currentPollMs);
}
clearPolling(workflowId);
return;
}
} catch (err) {
node.error(`Seqera API request failed: ${err.message}`, msg);
node.error(`Workflow ${workflowId}: ${err.message}`, { ...passthroughProps, workflowId });
node.status({ fill: "red", shape: "dot", text: `error: ${formatDateTime()}` });
clearPolling();
clearPolling(workflowId);
}
}

node.on("input", async function (msg, send, done) {
clearPolling();

let workflowId;
try {
// Kick off status fetch (will set up interval if needed)
await fetchStatus(msg, send);

// Start polling loop if enabled and interval not yet set
if (node.keepPolling && !intervalId) {
const pollInterval = await evalProp(node.pollIntervalProp, node.pollIntervalPropType, msg);
const pollSec = parseInt(pollInterval, 10) || 5;
node._currentPollMs = pollSec * 1000;
intervalId = setInterval(() => fetchStatus(msg, send), node._currentPollMs);
// Evaluate workflowId from the incoming message
workflowId = await evalProp(node.workflowIdProp, node.workflowIdPropType, msg);

if (!workflowId) {
throw new Error("workflowId not provided");
}

// If this workflow is already being monitored, clear its old interval
if (activeWorkflows.has(workflowId)) {
clearPolling(workflowId);
}

// Evaluate properties once at the start (they won't change during polling)
const workspaceIdOverride = await evalProp(node.workspaceIdProp, node.workspaceIdPropType, msg);
const pollInterval = await evalProp(node.pollIntervalProp, node.pollIntervalPropType, msg);
const pollSec = parseInt(pollInterval, 10) || 5;
const pollMs = pollSec * 1000;

// Extract passthrough properties and store only essential data
const passthroughProps = extractPassthroughProps(msg);

// Store workflow tracking data (no full msg object to avoid memory leaks)
const workflow = {
intervalId: null,
workspaceId: workspaceIdOverride || msg.workspaceId || null,
passthroughProps: passthroughProps,
send: send,
pollMs: pollMs,
};
activeWorkflows.set(workflowId, workflow);

// Kick off initial status fetch
await fetchStatus(workflowId);

// Start polling loop if enabled and workflow is still active (fetchStatus might have removed it)
if (node.keepPolling && activeWorkflows.has(workflowId)) {
const updatedWorkflow = activeWorkflows.get(workflowId);
updatedWorkflow.intervalId = setInterval(() => fetchStatus(workflowId), pollMs);
activeWorkflows.set(workflowId, updatedWorkflow);
}

if (done) done();
} catch (err) {
if (done) done(err);
const wfId = workflowId || "unknown";
node.error(`Workflow ${wfId}: ${err.message}`, msg);
node.status({ fill: "red", shape: "dot", text: `error: ${formatDateTime()}` });
// Don't call done(err) to avoid double-done issue in tests
if (done) done();
}
});
}
Expand Down
Loading