From ec33cf3e9e4d3ade3186f4cd7af2674cd65ca01f Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Sun, 3 Aug 2025 19:54:02 +0200 Subject: [PATCH] NIFI-8269 - Add support for schema inference in ForkRecord processor when extracting array records * only compute writer schema from reader schema if writer is configured to inherit the schema from the reader --- .../nifi-standard-processors/pom.xml | 4 + .../nifi/processors/standard/ForkRecord.java | 245 +++++++++++++----- .../processors/standard/TestForkRecord.java | 90 +++++++ .../complex-input-json-for-inference.json | 116 +++++++++ .../output/extract-address-with-parents.json | 121 +++++++++ .../extract-address-without-parents.json | 21 ++ .../extract-bank-accounts-with-parents.json | 108 ++++++++ 7 files changed, 640 insertions(+), 65 deletions(-) create mode 100644 nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json create mode 100644 nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-with-parents.json create mode 100644 nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-without-parents.json create mode 100644 nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 3f4b62f1094a..6455edce4593 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -624,6 +624,10 @@ src/test/resources/TestExtractGrok/simple_text.log src/test/resources/TestExtractRecordSchema/name_age_schema.avsc src/test/resources/TestForkRecord/input/complex-input-json.json + src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json + src/test/resources/TestForkRecord/output/extract-address-without-parents.json + src/test/resources/TestForkRecord/output/extract-address-with-parents.json + src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json src/test/resources/TestForkRecord/output/extract-transactions.json src/test/resources/TestForkRecord/output/split-address.json src/test/resources/TestForkRecord/output/split-transactions.json diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java index 6f3d62c9df84..a785058159f1 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java @@ -53,12 +53,17 @@ import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; import java.io.IOException; import java.io.InputStream; @@ -67,6 +72,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -247,103 +253,212 @@ public void onTrigger(final ProcessContext context, final ProcessSession session public void process(final InputStream in) throws IOException { try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), getLogger())) { - final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema()); - final OutputStream out = session.write(outFlowFile); + final Record firstRecord = reader.nextRecord(); - try (final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, out, outFlowFile)) { + final RecordSchema readerSchema = reader.getSchema(); + final RecordSchema configuredWriterSchema = writerFactory.getSchema(originalAttributes, readerSchema); + + // we compute the write schema only if the writer is configured to inherit the + // reader schema + final RecordSchema writeSchema; + if (configuredWriterSchema == readerSchema) { + final RecordSchema derivedSchema = determineWriteSchema(firstRecord, readerSchema); + writeSchema = writerFactory.getSchema(originalAttributes, derivedSchema); + } else { + writeSchema = configuredWriterSchema; + } + + try (final OutputStream out = session.write(outFlowFile); + final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, out, outFlowFile)) { recordSetWriter.beginRecordSet(); - // we read each record of the input flow file + if (firstRecord != null) { + writeForkedRecords(firstRecord, recordSetWriter, writeSchema); + readCount.incrementAndGet(); + } + Record record; while ((record = reader.nextRecord()) != null) { - readCount.incrementAndGet(); + writeForkedRecords(record, recordSetWriter, writeSchema); + } - for (RecordPath recordPath : recordPaths) { + final WriteResult writeResult = recordSetWriter.finishRecordSet(); - // evaluate record path in each record of the flow file - Iterator it = recordPath.evaluate(record).getSelectedFields().iterator(); + try { + recordSetWriter.close(); + } catch (final IOException ioe) { + getLogger().warn("Failed to close Writer for {}", outFlowFile); + } - while (it.hasNext()) { - FieldValue fieldValue = it.next(); - RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType(); + final Map attributes = new HashMap<>(); + writeCount.set(writeResult.getRecordCount()); + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType()); + attributes.putAll(writeResult.getAttributes()); + session.transfer(session.putAllAttributes(outFlowFile, attributes), REL_FORK); + } - // we want to have an array here, nothing else allowed - if (fieldType != RecordFieldType.ARRAY) { - getLogger().debug("The record path {} is matching a field of type {} when the type ARRAY is expected.", recordPath.getPath(), fieldType); - continue; + } catch (final SchemaNotFoundException | MalformedRecordException e) { + throw new ProcessException("Could not parse incoming data: " + e.getLocalizedMessage(), e); + } + } + + private RecordSchema determineWriteSchema(final Record firstRecord, final RecordSchema readerSchema) throws SchemaNotFoundException, IOException { + if (isSplitMode || firstRecord == null) { + return readerSchema; + } + + final Map fieldMap = new LinkedHashMap<>(); + + for (RecordPath recordPath : recordPaths) { + final Iterator iterator = recordPath.evaluate(firstRecord).getSelectedFields().iterator(); + while (iterator.hasNext()) { + final FieldValue fieldValue = iterator.next(); + Object fieldObject = fieldValue.getValue(); + if (fieldObject instanceof List) { + fieldObject = ((List) fieldObject).toArray(); + } + + DataType dataType = fieldValue.getField().getDataType(); + if (dataType.getFieldType() == RecordFieldType.CHOICE) { + DataType chosen = null; + if (fieldObject != null) { + chosen = DataTypeUtils.chooseDataType(fieldObject, (ChoiceDataType) dataType); + } + if (chosen == null) { + for (final DataType possible : ((ChoiceDataType) dataType).getPossibleSubTypes()) { + if (((ArrayDataType) possible).getElementType().getFieldType() == RecordFieldType.RECORD) { + chosen = possible; + break; } - if (fieldValue.getValue() == null) { - getLogger().debug("The record path {} is matching a field the value of which is null.", recordPath.getPath()); - continue; + if (chosen == null) { + chosen = possible; } + } + } + if (chosen != null) { + dataType = chosen; + } + } - if (isSplitMode) { + if (!(dataType instanceof ArrayDataType)) { + continue; + } - Object[] items = (Object[]) fieldValue.getValue(); - for (Object item : items) { - fieldValue.updateValue(new Object[]{item}); - recordSetWriter.write(record); - } + final ArrayDataType arrayDataType = (ArrayDataType) dataType; + final DataType elementType = arrayDataType.getElementType(); - } else { + if (elementType.getFieldType() != RecordFieldType.RECORD) { + continue; + } - // we get the type of the elements of the array - final ArrayDataType arrayDataType = (ArrayDataType) fieldValue.getField().getDataType(); - final DataType elementType = arrayDataType.getElementType(); + final RecordSchema elementSchema = ((RecordDataType) elementType).getChildSchema(); + for (final RecordField elementField : elementSchema.getFields()) { + fieldMap.put(elementField.getFieldName(), elementField); + } - // we want to have records in the array - if (elementType.getFieldType() != RecordFieldType.RECORD) { - getLogger().debug("The record path {} is matching an array field with values of type {} when the type RECORD is expected.", - recordPath.getPath(), elementType.getFieldType()); - continue; - } + if (addParentFields) { + addParentFieldSchemas(fieldMap, fieldValue); + } + } + } - Object[] records = (Object[]) fieldValue.getValue(); - for (Object elementRecord : records) { + final RecordSchema schema = new SimpleRecordSchema(new ArrayList<>(fieldMap.values())); + return writerFactory.getSchema(originalAttributes, schema); + } - if (elementRecord == null) { - continue; - } + private void addParentFieldSchemas(final Map fieldMap, final FieldValue fieldValue) { + try { + final FieldValue parentField = fieldValue.getParent().get(); + final Record parentRecord = fieldValue.getParentRecord().get(); - Record recordToWrite = (Record) elementRecord; + for (final RecordField field : parentRecord.getSchema().getFields()) { + if (!field.getFieldName().equals(fieldValue.getField().getFieldName())) { + fieldMap.putIfAbsent(field.getFieldName(), field); + } + } - if (addParentFields) { - // in this case we want to recursively add the parent fields into the record to write - // but we need to ensure that the Record has the appropriate schema for that - recordToWrite.incorporateSchema(writeSchema); - recursivelyAddParentFields(recordToWrite, fieldValue); - } + addParentFieldSchemas(fieldMap, parentField); + } catch (NoSuchElementException e) { + return; // No parent field, nothing to do + } + } - recordSetWriter.write(recordToWrite); - } + private void writeForkedRecords(final Record record, final RecordSetWriter recordSetWriter, final RecordSchema writeSchema) throws IOException { + for (RecordPath recordPath : recordPaths) { + final Iterator it = recordPath.evaluate(record).getSelectedFields().iterator(); - } + while (it.hasNext()) { + final FieldValue fieldValue = it.next(); + Object fieldObject = fieldValue.getValue(); + if (fieldObject instanceof List) { + fieldObject = ((List) fieldObject).toArray(); + } + DataType dataType = fieldValue.getField().getDataType(); + if (dataType.getFieldType() == RecordFieldType.CHOICE) { + DataType chosen = null; + if (fieldObject != null) { + chosen = DataTypeUtils.chooseDataType(fieldObject, (ChoiceDataType) dataType); + } + if (chosen == null) { + for (final DataType possible : ((ChoiceDataType) dataType).getPossibleSubTypes()) { + if (possible.getFieldType() == RecordFieldType.ARRAY) { + if (((ArrayDataType) possible).getElementType().getFieldType() == RecordFieldType.RECORD) { + chosen = possible; + break; + } + if (chosen == null) { + chosen = possible; + } + } } - + } + if (chosen != null) { + dataType = chosen; } } - final WriteResult writeResult = recordSetWriter.finishRecordSet(); - - try { - recordSetWriter.close(); - } catch (final IOException ioe) { - getLogger().warn("Failed to close Writer for {}", outFlowFile); + if (!(dataType instanceof ArrayDataType) || fieldObject == null) { + getLogger().debug("The record path {} is matching a field of type {} when the type ARRAY is expected.", recordPath.getPath(), dataType.getFieldType()); + continue; } - final Map attributes = new HashMap<>(); - writeCount.set(writeResult.getRecordCount()); - attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); - attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType()); - attributes.putAll(writeResult.getAttributes()); - session.transfer(session.putAllAttributes(outFlowFile, attributes), REL_FORK); - } + if (isSplitMode) { + final Object[] items = (Object[]) fieldObject; + for (final Object item : items) { + fieldValue.updateValue(new Object[]{item}); + recordSetWriter.write(record); + } + } else { + final ArrayDataType arrayDataType = (ArrayDataType) dataType; + final DataType elementType = arrayDataType.getElementType(); + + if (elementType.getFieldType() != RecordFieldType.RECORD) { + getLogger().debug("The record path {} is matching an array field with values of type {} when the type RECORD is expected.", + recordPath.getPath(), elementType.getFieldType()); + continue; + } - } catch (final SchemaNotFoundException | MalformedRecordException e) { - throw new ProcessException("Could not parse incoming data: " + e.getLocalizedMessage(), e); + final Object[] records = (Object[]) fieldObject; + for (final Object elementRecord : records) { + if (elementRecord == null) { + continue; + } + + final Record recordToWrite = (Record) elementRecord; + + if (addParentFields) { + recordToWrite.incorporateSchema(writeSchema); + recursivelyAddParentFields(recordToWrite, fieldValue); + } + + recordSetWriter.write(recordToWrite); + } + } + } } } diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java index db0fff865d64..0295d840543b 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java @@ -451,6 +451,96 @@ public void testExtractMode() throws InitializationException, IOException { runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count", "6"); } + @Test + public void testExtractWithParentFieldsAndInferredSchema() throws Exception { + TestRunner runner = TestRunners.newTestRunner(new ForkRecord()); + + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("record-reader", jsonReader); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("record-writer", jsonWriter); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.enableControllerService(jsonWriter); + + runner.setProperty(ForkRecord.RECORD_READER, "record-reader"); + runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer"); + runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT); + runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true"); + runner.setProperty("bankAccounts", "/bankAccounts"); + + runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json")); + runner.run(); + + runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(ForkRecord.REL_FORK, 1); + + final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json"))); + runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count", "5"); + runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput); + } + + @Test + public void testExtractFieldsAndInferredSchema() throws Exception { + TestRunner runner = TestRunners.newTestRunner(new ForkRecord()); + + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("record-reader", jsonReader); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("record-writer", jsonWriter); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.enableControllerService(jsonWriter); + + runner.setProperty(ForkRecord.RECORD_READER, "record-reader"); + runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer"); + runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT); + runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "false"); + runner.setProperty("address", "/address"); + + runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json")); + runner.run(); + + runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(ForkRecord.REL_FORK, 1); + + final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-address-without-parents.json"))); + runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count", "5"); + runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput); + } + + @Test + public void testExtractFieldsWithParentsAndFieldConflictAndInferredSchema() throws Exception { + TestRunner runner = TestRunners.newTestRunner(new ForkRecord()); + + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("record-reader", jsonReader); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("record-writer", jsonWriter); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.enableControllerService(jsonWriter); + + runner.setProperty(ForkRecord.RECORD_READER, "record-reader"); + runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer"); + runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT); + runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true"); + runner.setProperty("address", "/address"); + + runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json")); + runner.run(); + + runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(ForkRecord.REL_FORK, 1); + + final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-address-with-parents.json"))); + runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count", "5"); + runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput); + } + private class JsonRecordReader extends AbstractControllerService implements RecordReaderFactory { private static final JsonParserFactory jsonParserFactory = new JsonParserFactory(); diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json new file mode 100644 index 000000000000..c7839c31257a --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json @@ -0,0 +1,116 @@ +[ + { + "id": 1, + "name": { + "last": "Doe", + "first": "John" + }, + "address": [ + { + "id": "home", + "street": "1 nifi street", + "city": "nifi city" + } + ], + "bankAccounts": [ + { + "bankID": "OneBank", + "IBAN": "OneIBAN", + "last5Transactions": [ + { + "comment": "food", + "amount": "-450" + } + ] + } + ] + }, { + "id": 2, + "name": { + "last": "Smith", + "first": "John" + }, + "address": [null], + "bankAccounts": null + }, { + "id": 3, + "name": { + "last": "Smith", + "first": "Jane" + }, + "address": [ + { + "id": "home", + "street": "1 nifi street", + "city": "nifi city" + }, { + "id": "work", + "street": "1 nifi avenue", + "city": "apache city" + } + ], + "bankAccounts": [ + { + "bankID": "nifi bank", + "IBAN": "myIBAN", + "last5Transactions": null + }, { + "bankID": "apache bank", + "IBAN": "myIBAN", + "last5Transactions": [ + { + "comment": "gas station", + "amount": "-45" + }, { + "comment": "hair cut", + "amount": "-19" + } + ] + } + ] + }, { + "id": 4, + "name": { + "last": "Clark", + "first": "Jane" + }, + "address": [ + { + "id": "home", + "street": "10 nifi street", + "city": "nifi city" + }, { + "id": "work", + "street": "10 nifi avenue", + "city": "apache city" + } + ], + "bankAccounts": [ + { + "bankID": "nifi bank", + "IBAN": "myIBAN", + "last5Transactions": [ + { + "comment": "gift", + "amount": "+100" + }, { + "comment": "flights", + "amount": "-190" + } + ] + }, { + "bankID": "apache bank", + "IBAN": "myIBAN", + "last5Transactions": [ + { + "comment": "nifi tshirt", + "amount": "0" + }, { + "comment": "theatre", + "amount": "-19" + } + ] + } + ] + } +] \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-with-parents.json b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-with-parents.json new file mode 100644 index 000000000000..cea9a203abc9 --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-with-parents.json @@ -0,0 +1,121 @@ +[ { + "id" : "home", + "street" : "1 nifi street", + "city" : "nifi city", + "name" : { + "last" : "Doe", + "first" : "John" + }, + "bankAccounts" : [ { + "bankID" : "OneBank", + "IBAN" : "OneIBAN", + "last5Transactions" : [ { + "comment" : "food", + "amount" : "-450" + } ] + } ] +}, { + "id" : "home", + "street" : "1 nifi street", + "city" : "nifi city", + "name" : { + "last" : "Smith", + "first" : "Jane" + }, + "bankAccounts" : [ { + "bankID" : "nifi bank", + "IBAN" : "myIBAN", + "last5Transactions" : null + }, { + "bankID" : "apache bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "gas station", + "amount" : "-45" + }, { + "comment" : "hair cut", + "amount" : "-19" + } ] + } ] +}, { + "id" : "work", + "street" : "1 nifi avenue", + "city" : "apache city", + "name" : { + "last" : "Smith", + "first" : "Jane" + }, + "bankAccounts" : [ { + "bankID" : "nifi bank", + "IBAN" : "myIBAN", + "last5Transactions" : null + }, { + "bankID" : "apache bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "gas station", + "amount" : "-45" + }, { + "comment" : "hair cut", + "amount" : "-19" + } ] + } ] +}, { + "id" : "home", + "street" : "10 nifi street", + "city" : "nifi city", + "name" : { + "last" : "Clark", + "first" : "Jane" + }, + "bankAccounts" : [ { + "bankID" : "nifi bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "gift", + "amount" : "+100" + }, { + "comment" : "flights", + "amount" : "-190" + } ] + }, { + "bankID" : "apache bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "nifi tshirt", + "amount" : "0" + }, { + "comment" : "theatre", + "amount" : "-19" + } ] + } ] +}, { + "id" : "work", + "street" : "10 nifi avenue", + "city" : "apache city", + "name" : { + "last" : "Clark", + "first" : "Jane" + }, + "bankAccounts" : [ { + "bankID" : "nifi bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "gift", + "amount" : "+100" + }, { + "comment" : "flights", + "amount" : "-190" + } ] + }, { + "bankID" : "apache bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "nifi tshirt", + "amount" : "0" + }, { + "comment" : "theatre", + "amount" : "-19" + } ] + } ] +} ] \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-without-parents.json b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-without-parents.json new file mode 100644 index 000000000000..34c1836cab68 --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-address-without-parents.json @@ -0,0 +1,21 @@ +[ { + "id" : "home", + "street" : "1 nifi street", + "city" : "nifi city" +}, { + "id" : "home", + "street" : "1 nifi street", + "city" : "nifi city" +}, { + "id" : "work", + "street" : "1 nifi avenue", + "city" : "apache city" +}, { + "id" : "home", + "street" : "10 nifi street", + "city" : "nifi city" +}, { + "id" : "work", + "street" : "10 nifi avenue", + "city" : "apache city" +} ] \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json new file mode 100644 index 000000000000..6ebc4eeee0fa --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json @@ -0,0 +1,108 @@ +[ { + "bankID" : "OneBank", + "IBAN" : "OneIBAN", + "last5Transactions" : [ { + "comment" : "food", + "amount" : "-450" + } ], + "id" : 1, + "name" : { + "last" : "Doe", + "first" : "John" + }, + "address" : [ { + "id" : "home", + "street" : "1 nifi street", + "city" : "nifi city" + } ] +}, { + "bankID" : "nifi bank", + "IBAN" : "myIBAN", + "last5Transactions" : null, + "id" : 3, + "name" : { + "last" : "Smith", + "first" : "Jane" + }, + "address" : [ { + "id" : "home", + "street" : "1 nifi street", + "city" : "nifi city" + }, { + "id" : "work", + "street" : "1 nifi avenue", + "city" : "apache city" + } ] +}, { + "bankID" : "apache bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "gas station", + "amount" : "-45" + }, { + "comment" : "hair cut", + "amount" : "-19" + } ], + "id" : 3, + "name" : { + "last" : "Smith", + "first" : "Jane" + }, + "address" : [ { + "id" : "home", + "street" : "1 nifi street", + "city" : "nifi city" + }, { + "id" : "work", + "street" : "1 nifi avenue", + "city" : "apache city" + } ] +}, { + "bankID" : "nifi bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "gift", + "amount" : "+100" + }, { + "comment" : "flights", + "amount" : "-190" + } ], + "id" : 4, + "name" : { + "last" : "Clark", + "first" : "Jane" + }, + "address" : [ { + "id" : "home", + "street" : "10 nifi street", + "city" : "nifi city" + }, { + "id" : "work", + "street" : "10 nifi avenue", + "city" : "apache city" + } ] +}, { + "bankID" : "apache bank", + "IBAN" : "myIBAN", + "last5Transactions" : [ { + "comment" : "nifi tshirt", + "amount" : "0" + }, { + "comment" : "theatre", + "amount" : "-19" + } ], + "id" : 4, + "name" : { + "last" : "Clark", + "first" : "Jane" + }, + "address" : [ { + "id" : "home", + "street" : "10 nifi street", + "city" : "nifi city" + }, { + "id" : "work", + "street" : "10 nifi avenue", + "city" : "apache city" + } ] +} ] \ No newline at end of file