diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java index 3e51258b3ea5..59222286caac 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java @@ -90,6 +90,14 @@ public class JoltTransformRecord extends AbstractJoltTransform { .required(true) .build(); + static final PropertyDescriptor SCHEMA_WRITING_STRATEGY = new PropertyDescriptor.Builder() + .name("Schema Writing Strategy") + .description("Specifies how the processor should handle records that result in different schemas after transformation.") + .allowableValues(JoltTransformWritingStrategy.class) + .defaultValue(JoltTransformWritingStrategy.USE_FIRST_SCHEMA) + .required(true) + .build(); + static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("The FlowFile with transformed content will be routed to this relationship") @@ -108,6 +116,7 @@ public class JoltTransformRecord extends AbstractJoltTransform { private static final List PROPERTY_DESCRIPTORS = Stream.concat( getCommonPropertyDescriptors().stream(), Stream.of( + SCHEMA_WRITING_STRATEGY, RECORD_READER, RECORD_WRITER ) @@ -131,6 +140,16 @@ protected List getSupportedPropertyDescriptors() { @Override public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + final String strategy = context.getProperty(SCHEMA_WRITING_STRATEGY).getValue(); + + if (strategy.equals(JoltTransformWritingStrategy.PARTITION_BY_SCHEMA.getValue())) { + processPartitioned(context, session); + } else { + processUniform(context, session); + } + } + + private void processPartitioned(final ProcessContext context, final ProcessSession session) { final FlowFile original = session.get(); if (original == null) { return; @@ -142,75 +161,168 @@ public void onTrigger(final ProcessContext context, ProcessSession session) thro final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - final RecordSchema schema; - FlowFile transformed = null; + // Maps to track resources per Schema + final Map flowFileMap = new HashMap<>(); + final Map streamMap = new HashMap<>(); + final Map writerMap = new HashMap<>(); + final Map recordCounts = new HashMap<>(); + final Map writeResults = new HashMap<>(); - try (final InputStream in = session.read(original); - final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { - schema = writerFactory.getSchema(original.getAttributes(), reader.getSchema()); + boolean error = false; - final Map attributes = new HashMap<>(); - final WriteResult writeResult; - transformed = session.create(original); + try (final InputStream in = session.read(original); final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { - // We want to transform the first record before creating the Record Writer. We do this because the Record will likely end up with a different structure - // and therefore a difference Schema after being transformed. As a result, we want to transform the Record and then provide the transformed schema to the - // Record Writer so that if the Record Writer chooses to inherit the Record Schema from the Record itself, it will inherit the transformed schema, not the - // schema determined by the Record Reader. - final Record firstRecord = reader.nextRecord(); - if (firstRecord == null) { - try (final OutputStream out = session.write(transformed); - final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, transformed)) { + final JoltTransform transform = getTransform(context, original); + Record currentRecord; - writer.beginRecordSet(); - writeResult = writer.finishRecordSet(); + while ((currentRecord = reader.nextRecord()) != null) { + final List transformedRecords = transform(currentRecord, transform); - attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); - attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); - attributes.putAll(writeResult.getAttributes()); + if (transformedRecords.isEmpty()) { + continue; } - transformed = session.putAllAttributes(transformed, attributes); - logger.info("{} had no Records to transform", original); - } else { + for (Record transformedRecord : transformedRecords) { + if (transformedRecord == null) { + continue; + } + + final RecordSchema recordSchema = transformedRecord.getSchema(); + final RecordSchema writeSchema = writerFactory.getSchema(original.getAttributes(), recordSchema); + + RecordSetWriter writer = writerMap.get(writeSchema); + + if (writer == null) { + FlowFile newFlowFile = session.create(original); + OutputStream out = session.write(newFlowFile); + writer = writerFactory.createWriter(getLogger(), writeSchema, out, newFlowFile); + + writer.beginRecordSet(); - final JoltTransform transform = getTransform(context, original); - final List transformedFirstRecords = transform(firstRecord, transform); + flowFileMap.put(writeSchema, newFlowFile); + streamMap.put(writeSchema, out); + writerMap.put(writeSchema, writer); + recordCounts.put(writeSchema, 0); + } + + writer.write(transformedRecord); + recordCounts.put(writeSchema, recordCounts.get(writeSchema) + 1); + } + } + } catch (final Exception e) { + error = true; + logger.error("Transform failed for {}", original, e); + } finally { + // Clean up resources + for (Map.Entry entry : writerMap.entrySet()) { + try { + final RecordSetWriter writer = entry.getValue(); + writeResults.put(entry.getKey(), writer.finishRecordSet()); + writer.close(); + } catch (Exception e) { + getLogger().warn("Failed to close Writer", e); + } + } + for (OutputStream out : streamMap.values()) { + try { + out.close(); + } catch (Exception e) { + getLogger().warn("Failed to close OutputStream", e); + } + } + } + + if (error) { + for (FlowFile flowFile : flowFileMap.values()) { + session.remove(flowFile); + } + session.transfer(original, REL_FAILURE); + } else { + if (writerMap.isEmpty()) { + logger.info("{} had no Records to transform (all filtered)", original); + } else { + final String transformType = context.getProperty(JOLT_TRANSFORM).getValue(); + for (Map.Entry entry : writerMap.entrySet()) { + RecordSchema schemaKey = entry.getKey(); + RecordSetWriter writer = entry.getValue(); + FlowFile flowFile = flowFileMap.get(schemaKey); + int count = recordCounts.get(schemaKey); + WriteResult writeResult = writeResults.get(schemaKey); + + Map attributes = new HashMap<>(writeResult.getAttributes()); + attributes.put("record.count", String.valueOf(count)); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); - if (transformedFirstRecords.isEmpty()) { - throw new ProcessException("Error transforming the first record"); + flowFile = session.putAllAttributes(flowFile, attributes); + session.getProvenanceReporter().modifyContent(flowFile, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); } + } + session.transfer(original, REL_ORIGINAL); + } + } + + private void processUniform(final ProcessContext context, final ProcessSession session) { + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final ComponentLog logger = getLogger(); + final StopWatch stopWatch = new StopWatch(true); + + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + + FlowFile transformed = null; + + try (final InputStream in = session.read(original); final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { + + final JoltTransform transform = getTransform(context, original); - final Record transformedFirstRecord = transformedFirstRecords.getFirst(); - if (transformedFirstRecord == null) { - throw new ProcessException("Error transforming the first record"); + // We need to find the first VALID record to determine the schema for the writer + Record firstValidRecord = null; + List firstValidBatch = null; + Record currentRecord; + + while ((currentRecord = reader.nextRecord()) != null) { + List transformedRecords = transform(currentRecord, transform); + if (!transformedRecords.isEmpty() && transformedRecords.getFirst() != null) { + firstValidBatch = transformedRecords; + firstValidRecord = transformedRecords.getFirst(); + break; } - final RecordSchema writeSchema = writerFactory.getSchema(original.getAttributes(), transformedFirstRecord.getSchema()); + } - // TODO: Is it possible that two Records with the same input schema could have different schemas after transformation? - // If so, then we need to avoid this pattern of writing all Records from the input FlowFile to the same output FlowFile - // and instead use a Map. This way, even if many different output schemas are possible, - // the output FlowFiles will each only contain records that have the same schema. - try (final OutputStream out = session.write(transformed); - final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, transformed)) { + final WriteResult writeResult; + final Map attributes = new HashMap<>(); + + if (firstValidRecord == null) { + // UPDATED LOGIC: + // All records were filtered out (or input was empty). + logger.info("{} had no Records to transform", original); + } else { + transformed = session.create(original); + + // We have at least one valid record, initialize writer with its schema + final RecordSchema writeSchema = writerFactory.getSchema(original.getAttributes(), firstValidRecord.getSchema()); + + try (final OutputStream out = session.write(transformed); final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, transformed)) { writer.beginRecordSet(); - writer.write(transformedFirstRecord); - Record record; - // If multiple output records were generated, write them out - for (int i = 1; i < transformedFirstRecords.size(); i++) { - record = transformedFirstRecords.get(i); - if (record == null) { - throw new ProcessException("Error transforming the first record"); - } - writer.write(record); + // Write the first batch found + for (Record r : firstValidBatch) { + if (r != null) writer.write(r); } - while ((record = reader.nextRecord()) != null) { - final List transformedRecords = transform(record, transform); - for (Record transformedRecord : transformedRecords) { - writer.write(transformedRecord); + // Write the rest + while ((currentRecord = reader.nextRecord()) != null) { + final List transformedRecords = transform(currentRecord, transform); + if (!transformedRecords.isEmpty()) { + for (Record r : transformedRecords) { + if (r != null) writer.write(r); + } } } @@ -228,10 +340,12 @@ record = transformedFirstRecords.get(i); } final String transformType = context.getProperty(JOLT_TRANSFORM).getValue(); - transformed = session.putAllAttributes(transformed, attributes); session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - logger.debug("Transform completed {}", original); + + // Only apply attributes if we actually wrote something + transformed = session.putAllAttributes(transformed, attributes); } + } catch (final Exception e) { logger.error("Transform failed for {}", original, e); session.transfer(original, REL_FAILURE); @@ -240,6 +354,7 @@ record = transformedFirstRecords.get(i); } return; } + if (transformed != null) { session.transfer(transformed, REL_SUCCESS); } @@ -248,13 +363,12 @@ record = transformedFirstRecords.get(i); @Override public void migrateProperties(PropertyConfiguration config) { - config.renameProperty("jolt-record-record-reader", RECORD_READER.getName()); - config.renameProperty("jolt-record-record-writer", RECORD_WRITER.getName()); + config.renameProperty("jolt-record-record-reader", RECORD_READER.getName()); + config.renameProperty("jolt-record-record-writer", RECORD_WRITER.getName()); } private List transform(final Record record, final JoltTransform transform) { Map recordMap = (Map) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema())); - // JOLT expects arrays to be of type List where our Record code uses Object[]. // Make another pass of the transformed objects to change Object[] to List. recordMap = (Map) normalizeJoltObjects(recordMap); @@ -269,7 +383,8 @@ private List transform(final Record record, final JoltTransform transfor return recordList; } - // If the top-level object is an array, return a list of the records inside. Otherwise return a singleton list with the single transformed record + // If the top-level object is an array, return a list of the records inside. + // Otherwise return a singleton list with the single transformed record if (normalizedRecordValues instanceof Object[]) { for (Object o : (Object[]) normalizedRecordValues) { if (o != null) { @@ -284,11 +399,13 @@ private List transform(final Record record, final JoltTransform transfor protected static Object transform(JoltTransform joltTransform, Object input) { return joltTransform instanceof ContextualTransform - ? ((ContextualTransform) joltTransform).transform(input, Collections.emptyMap()) : ((Transform) joltTransform).transform(input); + ? + ((ContextualTransform) joltTransform).transform(input, Collections.emptyMap()) : ((Transform) joltTransform).transform(input); } /** - * Recursively replace List objects with Object[]. JOLT expects arrays to be of type List where our Record code uses Object[]. + * Recursively replace List objects with Object[]. + * JOLT expects arrays to be of type List where our Record code uses Object[]. * * @param o The object to normalize with respect to JOLT */ @@ -332,4 +449,4 @@ protected static Object normalizeRecordObjects(final Object o) { return o; } } -} +} \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformWritingStrategy.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformWritingStrategy.java new file mode 100644 index 000000000000..df1523a90eb1 --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformWritingStrategy.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.jolt; + +import org.apache.nifi.components.DescribedValue; + +public enum JoltTransformWritingStrategy implements DescribedValue { + USE_FIRST_SCHEMA("The first successful transformation determines the schema for the entire FlowFile. All subsequent records must adhere to this schema or they may be invalid."), + PARTITION_BY_SCHEMA("Each unique schema generated by the transformation results in a separate output FlowFile. Useful when Jolt produces variable output structures."); + + private final String description; + + JoltTransformWritingStrategy(final String description) { + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return name(); + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecord.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java similarity index 90% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecord.java rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java index 308bb9dd4213..b1a775d15cb7 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecord.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java @@ -19,10 +19,8 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.jolt.util.JoltTransformStrategy; import org.apache.nifi.json.JsonRecordSetWriter; -import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.processor.Relationship; import org.apache.nifi.schema.access.SchemaAccessUtils; -import org.apache.nifi.schema.inference.SchemaInferenceUtil; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MockRecordParser; @@ -37,8 +35,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledOnOs; -import org.junit.jupiter.api.condition.OS; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -61,15 +57,14 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -@DisabledOnOs(OS.WINDOWS) //The pretty printed json comparisons don't work on windows -public class TestJoltTransformRecord { +public abstract class TestBaseJoltTransformRecord { final static String CHAINR_SPEC_PATH = "src/test/resources/specs/chainrSpec.json"; static String chainrSpecContents; - private TestRunner runner; - private JoltTransformRecord processor; - private MockRecordParser parser; - private JsonRecordSetWriter writer; + protected TestRunner runner; + protected JoltTransformRecord processor; + protected MockRecordParser parser; + protected JsonRecordSetWriter writer; @BeforeAll static void setUpBeforeAll() throws Exception { @@ -88,9 +83,12 @@ public void setup() throws Exception { runner.addControllerService("writer", writer); runner.setProperty(writer, "Schema Write Strategy", "full-schema-attribute"); runner.setProperty(JoltTransformRecord.RECORD_WRITER, "writer"); + runner.setProperty(JoltTransformRecord.SCHEMA_WRITING_STRATEGY, getWritingStrategy()); // Each test must set the Schema Access strategy and Schema, and enable the writer CS } + protected abstract String getWritingStrategy(); + @Test public void testRelationshipsCreated() throws IOException { generateTestData(1, null); @@ -207,7 +205,7 @@ public void testNoRecords() throws IOException { runner.run(); runner.assertQueueEmpty(); runner.assertTransferCount(JoltTransformRecord.REL_FAILURE, 0); - runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 0); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); } @@ -301,8 +299,7 @@ public void testTransformInputWithChainr(Path specPath, String ignoredDescriptio final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutput.json")), new String(transformed.toByteArray())); } @Test @@ -323,8 +320,7 @@ public void testTransformInputWithShiftr() throws IOException { final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json")), new String(transformed.toByteArray())); } @Test @@ -343,10 +339,9 @@ public void testTransformInputWithShiftrAccentedChars() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - transformed.assertAttributeExists(CoreAttributes.MIME_TYPE .key()); + transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json")), new String(transformed.toByteArray())); } @Test @@ -362,12 +357,12 @@ public void testTransformInputWithShiftrMultipleOutputRecords() throws IOExcepti final Record record1 = new MapRecord(xSchema, Map.of("a", 1, "b", 2, "c", 3)); final Record record2 = new MapRecord(xSchema, Map.of("a", 11, "b", 21, "c", 31)); final Record record3 = new MapRecord(xSchema, Map.of("a", 21, "b", 2, "c", 3)); - final Object[] recordArray1 = new Object[]{record1, record2, record3}; + final Object[] recordArray1 = new Object[] {record1, record2, record3}; parser.addRecord((Object) recordArray1); final Record record4 = new MapRecord(xSchema, Map.of("a", 100, "b", 200, "c", 300)); final Record record5 = new MapRecord(xSchema, Map.of("a", 101, "b", 201, "c", 301)); - final Object[] recordArray2 = new Object[]{record4, record5}; + final Object[] recordArray2 = new Object[] {record4, record5}; parser.addRecord((Object) recordArray2); final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchemaMultipleOutputRecords.avsc")); @@ -385,8 +380,7 @@ public void testTransformInputWithShiftrMultipleOutputRecords() throws IOExcepti final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputMultipleOutputRecords.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputMultipleOutputRecords.json")), new String(transformed.toByteArray())); } @Test @@ -407,8 +401,7 @@ public void testTransformInputWithShiftrFromFile() throws IOException { final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json")), new String(transformed.toByteArray())); } @Test @@ -427,8 +420,7 @@ public void testTransformInputWithDefaultr() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json")), new String(transformed.toByteArray())); } @Test @@ -447,8 +439,7 @@ public void testTransformInputWithRemovr() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/removrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/removrOutput.json")), new String(transformed.toByteArray())); } @@ -468,8 +459,7 @@ public void testTransformInputWithCardinality() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/cardrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/cardrOutput.json")), new String(transformed.toByteArray())); } @@ -489,8 +479,7 @@ public void testTransformInputWithSortr() throws IOException { final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutput.json")), new String(transformed.toByteArray())); } @Test @@ -510,8 +499,7 @@ public void testTransformInputWithDefaultrExpressionLanguage() throws IOExceptio runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrELOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrELOutput.json")), new String(transformed.toByteArray())); } @@ -531,8 +519,7 @@ public void testTransformInputWithModifierDefault() throws IOException { runner.run(); runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefaultOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefaultOutput.json")), new String(transformed.toByteArray())); } @Test @@ -551,8 +538,7 @@ public void testTransformInputWithModifierDefine() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json")), new String(transformed.toByteArray())); } @Test @@ -571,8 +557,7 @@ public void testTransformInputWithModifierOverwrite() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json")), new String(transformed.toByteArray())); } @Test @@ -592,8 +577,7 @@ public void testTransformInputWithSortrPopulatedSpec() throws IOException { final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutput.json")), new String(transformed.toByteArray())); } @Test @@ -617,8 +601,7 @@ public void testTransformInputCustomTransformationIgnored() throws IOException { final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json")), new String(transformed.toByteArray())); } @Test @@ -661,8 +644,7 @@ public void testJoltSpecEL() throws IOException { runner.setProperty(JoltTransformRecord.JOLT_SPEC, "${joltSpec}"); runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformStrategy.DEFAULTR); - final Map attributes = Collections.singletonMap("joltSpec", - "{\"RatingRange\":5,\"rating\":{\"*\":{\"MaxLabel\":\"High\",\"MinLabel\":\"Low\",\"DisplayType\":\"NORMAL\"}}}"); + final Map attributes = Collections.singletonMap("joltSpec", "{\"RatingRange\":5,\"rating\":{\"*\":{\"MaxLabel\":\"High\",\"MinLabel\":\"Low\",\"DisplayType\":\"NORMAL\"}}}"); runner.enqueue(new byte[0], attributes); runner.run(); @@ -670,8 +652,7 @@ public void testJoltSpecEL() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json")), new String(transformed.toByteArray())); } @Test @@ -684,62 +665,47 @@ public void testJoltSpecInvalidEL() { } @Test - public void testJoltComplexChoiceField() throws Exception { - final JsonTreeReader reader = new JsonTreeReader(); - runner.addControllerService("reader", reader); - runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA); - runner.enableControllerService(reader); - runner.setProperty(JoltTransformRecord.RECORD_READER, "reader"); + public void testTransformInputAllFiltered() throws IOException { + generateTestData(3, null); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); runner.enableControllerService(writer); - final String flattenSpec = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/flattenSpec.json")); + final String flattenSpec = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/filterOutAllSpec.json")); runner.setProperty(JoltTransformRecord.JOLT_SPEC, flattenSpec); runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformStrategy.CHAINR); - final String inputJson = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/input.json")); - runner.enqueue(inputJson); - + runner.enqueue(new byte[0]); runner.run(); - runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 0); + runner.assertTransferCount(JoltTransformRecord.REL_FAILURE, 0); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); - - final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/flattenedOutput.json")), - new String(transformed.toByteArray())); } + private static Stream getChainrArguments() { - return Stream.of( - Arguments.of(Paths.get(CHAINR_SPEC_PATH), "has no single line comments"), + return Stream.of(Arguments.of(Paths.get(CHAINR_SPEC_PATH), "has no single line comments"), Arguments.of(Paths.get("src/test/resources/specs/chainrSpecWithSingleLineComment.json"), "has a single line comment")); } - private void generateTestData(int numRecords, final BiFunction recordGenerator) { + protected void generateTestData(int numRecords, final BiFunction recordGenerator) { if (recordGenerator == null) { - final RecordSchema primarySchema = new SimpleRecordSchema(List.of( - new RecordField("value", RecordFieldType.INT.getDataType()))); - final RecordSchema seriesSchema = new SimpleRecordSchema(List.of( - new RecordField("value", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())))); - final RecordSchema qualitySchema = new SimpleRecordSchema(List.of( - new RecordField("value", RecordFieldType.INT.getDataType()))); - final RecordSchema ratingSchema = new SimpleRecordSchema(Arrays.asList( - new RecordField("primary", RecordFieldType.RECORD.getDataType()), - new RecordField("series", RecordFieldType.RECORD.getDataType()), - new RecordField("quality", RecordFieldType.RECORD.getDataType()) - )); + final RecordSchema primarySchema = new SimpleRecordSchema(List.of(new RecordField("value", RecordFieldType.INT.getDataType()))); + final RecordSchema seriesSchema = new SimpleRecordSchema(List.of(new RecordField("value", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())))); + final RecordSchema qualitySchema = new SimpleRecordSchema(List.of(new RecordField("value", RecordFieldType.INT.getDataType()))); + final RecordSchema ratingSchema = new SimpleRecordSchema( + Arrays.asList(new RecordField("primary", RecordFieldType.RECORD.getDataType()), new RecordField("series", RecordFieldType.RECORD.getDataType()), + new RecordField("quality", RecordFieldType.RECORD.getDataType()))); parser.addSchemaField("rating", RecordFieldType.RECORD); for (int i = 0; i < numRecords; i++) { final Record primaryRecord = new MapRecord(primarySchema, Map.of("value", (10 * i) + 3)); - final Record seriesRecord = new MapRecord(seriesSchema, Map.of("value", new Integer[]{(10 * i) + 5, (10 * i) + 4})); + final Record seriesRecord = new MapRecord(seriesSchema, Map.of("value", new Integer[] {(10 * i) + 5, (10 * i) + 4})); final Record qualityRecord = new MapRecord(qualitySchema, Map.of("value", 3)); - Record ratingRecord = new MapRecord(ratingSchema, Map.of("primary", primaryRecord, - "series", seriesRecord, "quality", qualityRecord)); + Record ratingRecord = new MapRecord(ratingSchema, Map.of("primary", primaryRecord, "series", seriesRecord, "quality", qualityRecord)); parser.addRecord(ratingRecord); } @@ -747,4 +713,4 @@ private void generateTestData(int numRecords, final BiFunction flowFiles = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS); + final String result1 = new String(flowFiles.get(0).toByteArray()); + final String result2 = new String(flowFiles.get(1).toByteArray()); + + // Handles non-deterministic order + if (result1.contains("TRASH")) { + assertEquals(expectedOutput1, result1); + assertEquals(expectedOutput2, result2); + } else { + assertEquals(expectedOutput2, result1); + assertEquals(expectedOutput1, result2); + } + } +} \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordUniform.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordUniform.java new file mode 100644 index 000000000000..d951d5f5274b --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordUniform.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.jolt; + +import org.apache.nifi.jolt.util.JoltTransformStrategy; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.json.JsonTreeReader; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.inference.SchemaInferenceUtil; +import org.apache.nifi.util.MockFlowFile; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import java.nio.file.Files; +import java.nio.file.Paths; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@DisabledOnOs(OS.WINDOWS) //The pretty printed json comparisons don't work on windows +public class TestJoltTransformRecordUniform extends TestBaseJoltTransformRecord { + + @Override + protected String getWritingStrategy() { + return JoltTransformWritingStrategy.USE_FIRST_SCHEMA.getValue(); + } + + @Test + public void testJoltComplexChoiceField() throws Exception { + final JsonTreeReader reader = new JsonTreeReader(); + runner.addControllerService("reader", reader); + runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA); + runner.enableControllerService(reader); + runner.setProperty(JoltTransformRecord.RECORD_READER, "reader"); + + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); + runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); + runner.enableControllerService(writer); + + final String flattenSpec = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/flattenSpec.json")); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, flattenSpec); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformStrategy.CHAINR); + + final String inputJson = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/input.json")); + runner.enqueue(inputJson); + + runner.run(); + + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/flattenedOutput.json")), new String(transformed.toByteArray())); + } +} \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/filterOutAllSpec.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/filterOutAllSpec.json new file mode 100644 index 000000000000..397d8b5193cc --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/filterOutAllSpec.json @@ -0,0 +1,8 @@ +[ + { + "operation": "shift", + "spec": { + "key": "value" + } + } +] \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasInput.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasInput.json new file mode 100644 index 000000000000..7715a5ad2b41 --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasInput.json @@ -0,0 +1,8 @@ +[ + { + "test_field": "" + }, + { + "test_field": "value2" + } +] \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasOutput1.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasOutput1.json new file mode 100644 index 000000000000..4b95a87b8d29 --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasOutput1.json @@ -0,0 +1,3 @@ +[ { + "TRASH" : null +} ] \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasOutput2.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasOutput2.json new file mode 100644 index 000000000000..e2fe1057be05 --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasOutput2.json @@ -0,0 +1,5 @@ +[ { + "value" : { + "test_field" : "value2" + } +} ] \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasSpec.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasSpec.json new file mode 100644 index 000000000000..e42864532aae --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasSpec.json @@ -0,0 +1,13 @@ +[ + { + "operation": "shift", + "spec": { + "test_field": { + "": "TRASH", + "*": { + "$": "value.test_field" + } + } + } + } +] \ No newline at end of file