diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 3f4b62f1094a..6455edce4593 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -624,6 +624,10 @@
src/test/resources/TestExtractGrok/simple_text.log
src/test/resources/TestExtractRecordSchema/name_age_schema.avsc
src/test/resources/TestForkRecord/input/complex-input-json.json
+ src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json
+ src/test/resources/TestForkRecord/output/extract-address-without-parents.json
+ src/test/resources/TestForkRecord/output/extract-address-with-parents.json
+ src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json
src/test/resources/TestForkRecord/output/extract-transactions.json
src/test/resources/TestForkRecord/output/split-address.json
src/test/resources/TestForkRecord/output/split-transactions.json
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
index 6f3d62c9df84..a785058159f1 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java
@@ -53,12 +53,17 @@
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.io.IOException;
import java.io.InputStream;
@@ -67,6 +72,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -247,103 +253,212 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
public void process(final InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), getLogger())) {
- final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
- final OutputStream out = session.write(outFlowFile);
+ final Record firstRecord = reader.nextRecord();
- try (final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, out, outFlowFile)) {
+ final RecordSchema readerSchema = reader.getSchema();
+ final RecordSchema configuredWriterSchema = writerFactory.getSchema(originalAttributes, readerSchema);
+
+ // we compute the write schema only if the writer is configured to inherit the
+ // reader schema
+ final RecordSchema writeSchema;
+ if (configuredWriterSchema == readerSchema) {
+ final RecordSchema derivedSchema = determineWriteSchema(firstRecord, readerSchema);
+ writeSchema = writerFactory.getSchema(originalAttributes, derivedSchema);
+ } else {
+ writeSchema = configuredWriterSchema;
+ }
+
+ try (final OutputStream out = session.write(outFlowFile);
+ final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, out, outFlowFile)) {
recordSetWriter.beginRecordSet();
- // we read each record of the input flow file
+ if (firstRecord != null) {
+ writeForkedRecords(firstRecord, recordSetWriter, writeSchema);
+ readCount.incrementAndGet();
+ }
+
Record record;
while ((record = reader.nextRecord()) != null) {
-
readCount.incrementAndGet();
+ writeForkedRecords(record, recordSetWriter, writeSchema);
+ }
- for (RecordPath recordPath : recordPaths) {
+ final WriteResult writeResult = recordSetWriter.finishRecordSet();
- // evaluate record path in each record of the flow file
- Iterator it = recordPath.evaluate(record).getSelectedFields().iterator();
+ try {
+ recordSetWriter.close();
+ } catch (final IOException ioe) {
+ getLogger().warn("Failed to close Writer for {}", outFlowFile);
+ }
- while (it.hasNext()) {
- FieldValue fieldValue = it.next();
- RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType();
+ final Map attributes = new HashMap<>();
+ writeCount.set(writeResult.getRecordCount());
+ attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+ session.transfer(session.putAllAttributes(outFlowFile, attributes), REL_FORK);
+ }
- // we want to have an array here, nothing else allowed
- if (fieldType != RecordFieldType.ARRAY) {
- getLogger().debug("The record path {} is matching a field of type {} when the type ARRAY is expected.", recordPath.getPath(), fieldType);
- continue;
+ } catch (final SchemaNotFoundException | MalformedRecordException e) {
+ throw new ProcessException("Could not parse incoming data: " + e.getLocalizedMessage(), e);
+ }
+ }
+
+ private RecordSchema determineWriteSchema(final Record firstRecord, final RecordSchema readerSchema) throws SchemaNotFoundException, IOException {
+ if (isSplitMode || firstRecord == null) {
+ return readerSchema;
+ }
+
+ final Map fieldMap = new LinkedHashMap<>();
+
+ for (RecordPath recordPath : recordPaths) {
+ final Iterator iterator = recordPath.evaluate(firstRecord).getSelectedFields().iterator();
+ while (iterator.hasNext()) {
+ final FieldValue fieldValue = iterator.next();
+ Object fieldObject = fieldValue.getValue();
+ if (fieldObject instanceof List>) {
+ fieldObject = ((List>) fieldObject).toArray();
+ }
+
+ DataType dataType = fieldValue.getField().getDataType();
+ if (dataType.getFieldType() == RecordFieldType.CHOICE) {
+ DataType chosen = null;
+ if (fieldObject != null) {
+ chosen = DataTypeUtils.chooseDataType(fieldObject, (ChoiceDataType) dataType);
+ }
+ if (chosen == null) {
+ for (final DataType possible : ((ChoiceDataType) dataType).getPossibleSubTypes()) {
+ if (((ArrayDataType) possible).getElementType().getFieldType() == RecordFieldType.RECORD) {
+ chosen = possible;
+ break;
}
- if (fieldValue.getValue() == null) {
- getLogger().debug("The record path {} is matching a field the value of which is null.", recordPath.getPath());
- continue;
+ if (chosen == null) {
+ chosen = possible;
}
+ }
+ }
+ if (chosen != null) {
+ dataType = chosen;
+ }
+ }
- if (isSplitMode) {
+ if (!(dataType instanceof ArrayDataType)) {
+ continue;
+ }
- Object[] items = (Object[]) fieldValue.getValue();
- for (Object item : items) {
- fieldValue.updateValue(new Object[]{item});
- recordSetWriter.write(record);
- }
+ final ArrayDataType arrayDataType = (ArrayDataType) dataType;
+ final DataType elementType = arrayDataType.getElementType();
- } else {
+ if (elementType.getFieldType() != RecordFieldType.RECORD) {
+ continue;
+ }
- // we get the type of the elements of the array
- final ArrayDataType arrayDataType = (ArrayDataType) fieldValue.getField().getDataType();
- final DataType elementType = arrayDataType.getElementType();
+ final RecordSchema elementSchema = ((RecordDataType) elementType).getChildSchema();
+ for (final RecordField elementField : elementSchema.getFields()) {
+ fieldMap.put(elementField.getFieldName(), elementField);
+ }
- // we want to have records in the array
- if (elementType.getFieldType() != RecordFieldType.RECORD) {
- getLogger().debug("The record path {} is matching an array field with values of type {} when the type RECORD is expected.",
- recordPath.getPath(), elementType.getFieldType());
- continue;
- }
+ if (addParentFields) {
+ addParentFieldSchemas(fieldMap, fieldValue);
+ }
+ }
+ }
- Object[] records = (Object[]) fieldValue.getValue();
- for (Object elementRecord : records) {
+ final RecordSchema schema = new SimpleRecordSchema(new ArrayList<>(fieldMap.values()));
+ return writerFactory.getSchema(originalAttributes, schema);
+ }
- if (elementRecord == null) {
- continue;
- }
+ private void addParentFieldSchemas(final Map fieldMap, final FieldValue fieldValue) {
+ try {
+ final FieldValue parentField = fieldValue.getParent().get();
+ final Record parentRecord = fieldValue.getParentRecord().get();
- Record recordToWrite = (Record) elementRecord;
+ for (final RecordField field : parentRecord.getSchema().getFields()) {
+ if (!field.getFieldName().equals(fieldValue.getField().getFieldName())) {
+ fieldMap.putIfAbsent(field.getFieldName(), field);
+ }
+ }
- if (addParentFields) {
- // in this case we want to recursively add the parent fields into the record to write
- // but we need to ensure that the Record has the appropriate schema for that
- recordToWrite.incorporateSchema(writeSchema);
- recursivelyAddParentFields(recordToWrite, fieldValue);
- }
+ addParentFieldSchemas(fieldMap, parentField);
+ } catch (NoSuchElementException e) {
+ return; // No parent field, nothing to do
+ }
+ }
- recordSetWriter.write(recordToWrite);
- }
+ private void writeForkedRecords(final Record record, final RecordSetWriter recordSetWriter, final RecordSchema writeSchema) throws IOException {
+ for (RecordPath recordPath : recordPaths) {
+ final Iterator it = recordPath.evaluate(record).getSelectedFields().iterator();
- }
+ while (it.hasNext()) {
+ final FieldValue fieldValue = it.next();
+ Object fieldObject = fieldValue.getValue();
+ if (fieldObject instanceof List>) {
+ fieldObject = ((List>) fieldObject).toArray();
+ }
+ DataType dataType = fieldValue.getField().getDataType();
+ if (dataType.getFieldType() == RecordFieldType.CHOICE) {
+ DataType chosen = null;
+ if (fieldObject != null) {
+ chosen = DataTypeUtils.chooseDataType(fieldObject, (ChoiceDataType) dataType);
+ }
+ if (chosen == null) {
+ for (final DataType possible : ((ChoiceDataType) dataType).getPossibleSubTypes()) {
+ if (possible.getFieldType() == RecordFieldType.ARRAY) {
+ if (((ArrayDataType) possible).getElementType().getFieldType() == RecordFieldType.RECORD) {
+ chosen = possible;
+ break;
+ }
+ if (chosen == null) {
+ chosen = possible;
+ }
+ }
}
-
+ }
+ if (chosen != null) {
+ dataType = chosen;
}
}
- final WriteResult writeResult = recordSetWriter.finishRecordSet();
-
- try {
- recordSetWriter.close();
- } catch (final IOException ioe) {
- getLogger().warn("Failed to close Writer for {}", outFlowFile);
+ if (!(dataType instanceof ArrayDataType) || fieldObject == null) {
+ getLogger().debug("The record path {} is matching a field of type {} when the type ARRAY is expected.", recordPath.getPath(), dataType.getFieldType());
+ continue;
}
- final Map attributes = new HashMap<>();
- writeCount.set(writeResult.getRecordCount());
- attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
- attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
- attributes.putAll(writeResult.getAttributes());
- session.transfer(session.putAllAttributes(outFlowFile, attributes), REL_FORK);
- }
+ if (isSplitMode) {
+ final Object[] items = (Object[]) fieldObject;
+ for (final Object item : items) {
+ fieldValue.updateValue(new Object[]{item});
+ recordSetWriter.write(record);
+ }
+ } else {
+ final ArrayDataType arrayDataType = (ArrayDataType) dataType;
+ final DataType elementType = arrayDataType.getElementType();
+
+ if (elementType.getFieldType() != RecordFieldType.RECORD) {
+ getLogger().debug("The record path {} is matching an array field with values of type {} when the type RECORD is expected.",
+ recordPath.getPath(), elementType.getFieldType());
+ continue;
+ }
- } catch (final SchemaNotFoundException | MalformedRecordException e) {
- throw new ProcessException("Could not parse incoming data: " + e.getLocalizedMessage(), e);
+ final Object[] records = (Object[]) fieldObject;
+ for (final Object elementRecord : records) {
+ if (elementRecord == null) {
+ continue;
+ }
+
+ final Record recordToWrite = (Record) elementRecord;
+
+ if (addParentFields) {
+ recordToWrite.incorporateSchema(writeSchema);
+ recursivelyAddParentFields(recordToWrite, fieldValue);
+ }
+
+ recordSetWriter.write(recordToWrite);
+ }
+ }
+ }
}
}
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
index db0fff865d64..0295d840543b 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
@@ -451,6 +451,96 @@ public void testExtractMode() throws InitializationException, IOException {
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count", "6");
}
+ @Test
+ public void testExtractWithParentFieldsAndInferredSchema() throws Exception {
+ TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("record-reader", jsonReader);
+ runner.enableControllerService(jsonReader);
+
+ final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+ runner.addControllerService("record-writer", jsonWriter);
+ runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+ runner.enableControllerService(jsonWriter);
+
+ runner.setProperty(ForkRecord.RECORD_READER, "record-reader");
+ runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer");
+ runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
+ runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
+ runner.setProperty("bankAccounts", "/bankAccounts");
+
+ runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json"));
+ runner.run();
+
+ runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+
+ final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json")));
+ runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count", "5");
+ runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput);
+ }
+
+ @Test
+ public void testExtractFieldsAndInferredSchema() throws Exception {
+ TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("record-reader", jsonReader);
+ runner.enableControllerService(jsonReader);
+
+ final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+ runner.addControllerService("record-writer", jsonWriter);
+ runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+ runner.enableControllerService(jsonWriter);
+
+ runner.setProperty(ForkRecord.RECORD_READER, "record-reader");
+ runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer");
+ runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
+ runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "false");
+ runner.setProperty("address", "/address");
+
+ runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json"));
+ runner.run();
+
+ runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+
+ final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-address-without-parents.json")));
+ runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count", "5");
+ runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput);
+ }
+
+ @Test
+ public void testExtractFieldsWithParentsAndFieldConflictAndInferredSchema() throws Exception {
+ TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
+
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("record-reader", jsonReader);
+ runner.enableControllerService(jsonReader);
+
+ final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+ runner.addControllerService("record-writer", jsonWriter);
+ runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+ runner.enableControllerService(jsonWriter);
+
+ runner.setProperty(ForkRecord.RECORD_READER, "record-reader");
+ runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer");
+ runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
+ runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
+ runner.setProperty("address", "/address");
+
+ runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json"));
+ runner.run();
+
+ runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ForkRecord.REL_FORK, 1);
+
+ final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-address-with-parents.json")));
+ runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count", "5");
+ runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput);
+ }
+
private class JsonRecordReader extends AbstractControllerService implements RecordReaderFactory {
private static final JsonParserFactory jsonParserFactory = new JsonParserFactory();
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json
new file mode 100644
index 000000000000..c7839c31257a
--- /dev/null
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json
@@ -0,0 +1,116 @@
+[
+ {
+ "id": 1,
+ "name": {
+ "last": "Doe",
+ "first": "John"
+ },
+ "address": [
+ {
+ "id": "home",
+ "street": "1 nifi street",
+ "city": "nifi city"
+ }
+ ],
+ "bankAccounts": [
+ {
+ "bankID": "OneBank",
+ "IBAN": "OneIBAN",
+ "last5Transactions": [
+ {
+ "comment": "food",
+ "amount": "-450"
+ }
+ ]
+ }
+ ]
+ }, {
+ "id": 2,
+ "name": {
+ "last": "Smith",
+ "first": "John"
+ },
+ "address": [null],
+ "bankAccounts": null
+ }, {
+ "id": 3,
+ "name": {
+ "last": "Smith",
+ "first": "Jane"
+ },
+ "address": [
+ {
+ "id": "home",
+ "street": "1 nifi street",
+ "city": "nifi city"
+ }, {
+ "id": "work",
+ "street": "1 nifi avenue",
+ "city": "apache city"
+ }
+ ],
+ "bankAccounts": [
+ {
+ "bankID": "nifi bank",
+ "IBAN": "myIBAN",
+ "last5Transactions": null
+ }, {
+ "bankID": "apache bank",
+ "IBAN": "myIBAN",
+ "last5Transactions": [
+ {
+ "comment": "gas station",
+ "amount": "-45"
+ }, {
+ "comment": "hair cut",
+ "amount": "-19"
+ }
+ ]
+ }
+ ]
+ }, {
+ "id": 4,
+ "name": {
+ "last": "Clark",
+ "first": "Jane"
+ },
+ "address": [
+ {
+ "id": "home",
+ "street": "10 nifi street",
+ "city": "nifi city"
+ }, {
+ "id": "work",
+ "street": "10 nifi avenue",
+ "city": "apache city"
+ }
+ ],
+ "bankAccounts": [
+ {
+ "bankID": "nifi bank",
+ "IBAN": "myIBAN",
+ "last5Transactions": [
+ {
+ "comment": "gift",
+ "amount": "+100"
+ }, {
+ "comment": "flights",
+ "amount": "-190"
+ }
+ ]
+ }, {
+ "bankID": "apache bank",
+ "IBAN": "myIBAN",
+ "last5Transactions": [
+ {
+ "comment": "nifi tshirt",
+ "amount": "0"
+ }, {
+ "comment": "theatre",
+ "amount": "-19"
+ }
+ ]
+ }
+ ]
+ }
+]
\ No newline at end of file
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-with-parents.json b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-with-parents.json
new file mode 100644
index 000000000000..cea9a203abc9
--- /dev/null
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-with-parents.json
@@ -0,0 +1,121 @@
+[ {
+ "id" : "home",
+ "street" : "1 nifi street",
+ "city" : "nifi city",
+ "name" : {
+ "last" : "Doe",
+ "first" : "John"
+ },
+ "bankAccounts" : [ {
+ "bankID" : "OneBank",
+ "IBAN" : "OneIBAN",
+ "last5Transactions" : [ {
+ "comment" : "food",
+ "amount" : "-450"
+ } ]
+ } ]
+}, {
+ "id" : "home",
+ "street" : "1 nifi street",
+ "city" : "nifi city",
+ "name" : {
+ "last" : "Smith",
+ "first" : "Jane"
+ },
+ "bankAccounts" : [ {
+ "bankID" : "nifi bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : null
+ }, {
+ "bankID" : "apache bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "gas station",
+ "amount" : "-45"
+ }, {
+ "comment" : "hair cut",
+ "amount" : "-19"
+ } ]
+ } ]
+}, {
+ "id" : "work",
+ "street" : "1 nifi avenue",
+ "city" : "apache city",
+ "name" : {
+ "last" : "Smith",
+ "first" : "Jane"
+ },
+ "bankAccounts" : [ {
+ "bankID" : "nifi bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : null
+ }, {
+ "bankID" : "apache bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "gas station",
+ "amount" : "-45"
+ }, {
+ "comment" : "hair cut",
+ "amount" : "-19"
+ } ]
+ } ]
+}, {
+ "id" : "home",
+ "street" : "10 nifi street",
+ "city" : "nifi city",
+ "name" : {
+ "last" : "Clark",
+ "first" : "Jane"
+ },
+ "bankAccounts" : [ {
+ "bankID" : "nifi bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "gift",
+ "amount" : "+100"
+ }, {
+ "comment" : "flights",
+ "amount" : "-190"
+ } ]
+ }, {
+ "bankID" : "apache bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "nifi tshirt",
+ "amount" : "0"
+ }, {
+ "comment" : "theatre",
+ "amount" : "-19"
+ } ]
+ } ]
+}, {
+ "id" : "work",
+ "street" : "10 nifi avenue",
+ "city" : "apache city",
+ "name" : {
+ "last" : "Clark",
+ "first" : "Jane"
+ },
+ "bankAccounts" : [ {
+ "bankID" : "nifi bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "gift",
+ "amount" : "+100"
+ }, {
+ "comment" : "flights",
+ "amount" : "-190"
+ } ]
+ }, {
+ "bankID" : "apache bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "nifi tshirt",
+ "amount" : "0"
+ }, {
+ "comment" : "theatre",
+ "amount" : "-19"
+ } ]
+ } ]
+} ]
\ No newline at end of file
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-without-parents.json b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-without-parents.json
new file mode 100644
index 000000000000..34c1836cab68
--- /dev/null
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-without-parents.json
@@ -0,0 +1,21 @@
+[ {
+ "id" : "home",
+ "street" : "1 nifi street",
+ "city" : "nifi city"
+}, {
+ "id" : "home",
+ "street" : "1 nifi street",
+ "city" : "nifi city"
+}, {
+ "id" : "work",
+ "street" : "1 nifi avenue",
+ "city" : "apache city"
+}, {
+ "id" : "home",
+ "street" : "10 nifi street",
+ "city" : "nifi city"
+}, {
+ "id" : "work",
+ "street" : "10 nifi avenue",
+ "city" : "apache city"
+} ]
\ No newline at end of file
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json
new file mode 100644
index 000000000000..6ebc4eeee0fa
--- /dev/null
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json
@@ -0,0 +1,108 @@
+[ {
+ "bankID" : "OneBank",
+ "IBAN" : "OneIBAN",
+ "last5Transactions" : [ {
+ "comment" : "food",
+ "amount" : "-450"
+ } ],
+ "id" : 1,
+ "name" : {
+ "last" : "Doe",
+ "first" : "John"
+ },
+ "address" : [ {
+ "id" : "home",
+ "street" : "1 nifi street",
+ "city" : "nifi city"
+ } ]
+}, {
+ "bankID" : "nifi bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : null,
+ "id" : 3,
+ "name" : {
+ "last" : "Smith",
+ "first" : "Jane"
+ },
+ "address" : [ {
+ "id" : "home",
+ "street" : "1 nifi street",
+ "city" : "nifi city"
+ }, {
+ "id" : "work",
+ "street" : "1 nifi avenue",
+ "city" : "apache city"
+ } ]
+}, {
+ "bankID" : "apache bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "gas station",
+ "amount" : "-45"
+ }, {
+ "comment" : "hair cut",
+ "amount" : "-19"
+ } ],
+ "id" : 3,
+ "name" : {
+ "last" : "Smith",
+ "first" : "Jane"
+ },
+ "address" : [ {
+ "id" : "home",
+ "street" : "1 nifi street",
+ "city" : "nifi city"
+ }, {
+ "id" : "work",
+ "street" : "1 nifi avenue",
+ "city" : "apache city"
+ } ]
+}, {
+ "bankID" : "nifi bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "gift",
+ "amount" : "+100"
+ }, {
+ "comment" : "flights",
+ "amount" : "-190"
+ } ],
+ "id" : 4,
+ "name" : {
+ "last" : "Clark",
+ "first" : "Jane"
+ },
+ "address" : [ {
+ "id" : "home",
+ "street" : "10 nifi street",
+ "city" : "nifi city"
+ }, {
+ "id" : "work",
+ "street" : "10 nifi avenue",
+ "city" : "apache city"
+ } ]
+}, {
+ "bankID" : "apache bank",
+ "IBAN" : "myIBAN",
+ "last5Transactions" : [ {
+ "comment" : "nifi tshirt",
+ "amount" : "0"
+ }, {
+ "comment" : "theatre",
+ "amount" : "-19"
+ } ],
+ "id" : 4,
+ "name" : {
+ "last" : "Clark",
+ "first" : "Jane"
+ },
+ "address" : [ {
+ "id" : "home",
+ "street" : "10 nifi street",
+ "city" : "nifi city"
+ }, {
+ "id" : "work",
+ "street" : "10 nifi avenue",
+ "city" : "apache city"
+ } ]
+} ]
\ No newline at end of file