Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions nodes/datalink-poll.html
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,19 @@

### Outputs

The node has two outputs:
The node has three outputs:

1. All results on every poll.
2. New objects since the previous poll (nothing sent if no new objects).
1. **All results** - Fires on every poll with all current files.
2. **New or modified** - Fires only when files are new or have changed (based on metadata: lastModified, size, or etag). Files with the same name but different timestamps are detected as new.
3. **Deleted** - Fires only when files that were present in the previous poll are no longer found.

Both outputs have the following properties:
All outputs have the following properties:

: payload (array) : Fle information aggregated from the API (array of objects).
: files (array) : File names (array of strings).
: payload.files (array) : File information aggregated from the API (array of objects).
: payload.resourceType (string) : Type of the Data Link resource.
: payload.resourceRef (string) : Resource reference path.
: payload.provider (string) : Cloud provider name.
: files (array) : File paths as strings (array).

All typed-input fields are identical to the _List files_ node with the addition of **poll frequency**.
</script>
Expand All @@ -97,14 +101,14 @@
category: "seqera",
color: "#A9A1C6",
inputs: 0,
outputs: 2,
outputs: 3,
icon: "icons/data-explorer.svg",
align: "left",
paletteLabel: "Poll files",
label: function () {
return this.name || "Poll files";
},
outputLabels: ["All objects", "Only new objects"],
outputLabels: ["All objects", "New or modified", "Deleted"],
defaults: {
name: { value: "" },
seqera: { value: "", type: "seqera-config" },
Expand Down
64 changes: 53 additions & 11 deletions nodes/datalink-poll.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,21 @@ module.exports = function (RED) {
return `${d.getFullYear()}-${pad(d.getMonth() + 1)}-${pad(d.getDate())} ${d.toLocaleTimeString()}`;
};

// Internal cache of previously seen object names
let previousNamesSet = null;
// Helper to create unique identifier for a file based on name + metadata
// This ensures files with same name but different timestamps/size/etag are detected as new
const getFileIdentifier = (item) => {
const parts = [item.name];
if (item.lastModified) parts.push(item.lastModified);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We think that item.lastModified was probably hallucinated.

if (item.size != null) parts.push(String(item.size));
if (item.etag) parts.push(item.etag);
return parts.join("|");
};

// Internal cache of previously seen objects
// Store both identifiers (for change detection) and a map by name (for deletion detection)
let previousIdentifiersSet = null;
let previousItemsMap = null;
let intervalId = null;

// Polling function
const executePoll = async () => {
Expand All @@ -78,28 +91,57 @@ module.exports = function (RED) {
files: result.files.map((it) => `${result.resourceRef}/${it}`),
};

// Second output: only new items since previous poll
// Build current state for comparison
const currentIdentifiers = new Set(result.items.map(getFileIdentifier));
const currentNameToItem = new Map(result.items.map((it) => [it.name, it]));

// Second output: new or modified items since previous poll
let msgNew = null;
if (previousNamesSet) {
const newItems = result.items.filter((it) => !previousNamesSet.has(it.name));
if (newItems.length) {
if (previousIdentifiersSet) {
const newOrModified = result.items.filter((it) => !previousIdentifiersSet.has(getFileIdentifier(it)));
if (newOrModified.length) {
msgNew = {
...pollMsg,
payload: {
files: newOrModified,
resourceType: result.resourceType,
resourceRef: result.resourceRef,
provider: result.provider,
},
files: newOrModified.map((it) => `${result.resourceRef}/${it.name}`),
};
}
}

// Third output: deleted items (present in previous poll but not current)
let msgDeleted = null;
if (previousItemsMap) {
const deletedItems = [];
for (const [name, item] of previousItemsMap.entries()) {
if (!currentNameToItem.has(name)) {
deletedItems.push(item);
}
}
if (deletedItems.length) {
msgDeleted = {
...pollMsg,
payload: {
files: newItems,
files: deletedItems,
resourceType: result.resourceType,
resourceRef: result.resourceRef,
provider: result.provider,
},
files: newItems.map((it) => `${result.resourceRef}/${it.name}`),
files: deletedItems.map((it) => `${result.resourceRef}/${it.name}`),
};
}
}

// Update cache
previousNamesSet = new Set(result.items.map((it) => it.name));
previousIdentifiersSet = currentIdentifiers;
previousItemsMap = currentNameToItem;

node.status({ fill: "green", shape: "dot", text: `${result.items.length} items: ${formatDateTime()}` });
node.send([msgAll, msgNew]);
node.send([msgAll, msgNew, msgDeleted]);
} catch (err) {
node.error(`Seqera datalink poll failed: ${err.message}`);
node.status({ fill: "red", shape: "dot", text: `error: ${formatDateTime()}` });
Expand All @@ -109,7 +151,7 @@ module.exports = function (RED) {
// Start the polling interval
if (node.seqeraConfig && config.dataLinkName && config.dataLinkName.trim() !== "") {
const intervalMs = node.pollFrequencySec * 1000;
const intervalId = setInterval(executePoll, intervalMs);
intervalId = setInterval(executePoll, intervalMs);
// run once immediately
executePoll();
}
Expand Down