Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter addFiles between hooks #370

Open
wants to merge 2 commits into
base: v3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions src/Flow.js
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ export default class Flow extends Eventizer {
* @return Promise{[<FlowFile>,...]} The promise of getting an array of FlowFile.
*/
async addFiles(fileList, event = null, initFileFn = this.opts.initFileFn) {
let item, file, flowfile, uniqueIdentifier, states = [];
let item, file, uniqueIdentifier, flowFiles = [];
const iterator = this.filterFileList(fileList, event);

while ((item = iterator.next()) && !item.done) {
Expand All @@ -546,29 +546,27 @@ export default class Flow extends Eventizer {
continue;
}

// ToDo: parallelizable ?
var flowFile = new FlowFile(this, file, uniqueIdentifier),
state = flowFile.bootstrap(event, initFileFn);
states.push(state);
}
let flowFile = new FlowFile(this, file, uniqueIdentifier);
await flowFile.bootstrap(event, initFileFn);
await this.hook('file-added', flowFile, event);

var flowfiles = await Promise.all(states);
for (let ff of flowfiles) {
await this.hook('file-added', ff, event);
if(flowFile && flowFile.file) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting is off, space should be placed after an if

Copy link
Collaborator

@drzraf drzraf Oct 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've a problem with this. When multiple files are added this is expected to be a (mostly) independent process for each of them.
Accumulating the (async) bootstrap (and possibly async initFileFn) then waiting for all of them at once is superior as it allows for parallel non-blocking initialization processes to occurs (FlowFile.bootstrap()).
Then, only file-added hooks were run sequentially (awaiting in a loop for each of them).

With this change, we'd now wait twice in a loop. It's inefficient and contrary to parallelization abilities offered by Promises.

(While talking about this, would you mind having a look at #368?)

flowFiles.push(flowFile);
}
}

await this.hook('files-added', flowfiles, event);
await this.hook('files-added', flowFiles, event);

flowfiles = flowfiles.filter(e => e && e.file);
for (let file of flowfiles) {
flowFiles = flowFiles.filter(flowFile => flowFile && flowFile.file);
for (let file of flowFiles) {
if (this.opts.singleFile && this.files.length > 0) {
await this.removeFile(this.files[0]);
}
this.files.push(file);
}
await this.hook('files-submitted', this.files, event);

return flowfiles;
return flowFiles;
}

/**
Expand Down
17 changes: 17 additions & 0 deletions test/fileAddSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,23 @@ describe('fileAdd event', function() {
expect(valid).toBeTruthy();
});

it('should validate file-added filtering before files-added', async function() {
var valid = false;
flow.on('file-added', (flowFile) => {
if(flowFile.name === 'f2') {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting is off, space should be placed after an if

delete flowFile.file;
}
});
flow.on('files-added', (files) => {
valid = files.length === 1;
});
await flow.addFiles([
new File(['file'], 'f1'),
new File(['file2'], 'f2')
]);
expect(valid).toBeTruthy();
});

it('should validate multiple filter-file hooks', async function() {
const customFunction = jasmine.createSpy('fn');
flow.on('filter-file', async () => {
Expand Down