From d742cea23b79bd579a2abae20c68eae71041e537 Mon Sep 17 00:00:00 2001 From: Jordan Sammut Date: Tue, 18 Nov 2025 21:06:56 +0100 Subject: [PATCH 1/8] NIFI-15209 Introduction of a new property for JoltTransformRecord, which will enable the outputting of different schemas by having multiple output flowfiles --- .../processors/jolt/JoltTransformRecord.java | 306 +++++++++++------- .../jolt/JoltTransformWritingStrategy.java | 46 +++ ....java => TestBaseJoltTransformRecord.java} | 196 +++++------ .../TestJoltTransformRecordPartitioned.java | 69 ++++ .../jolt/TestJoltTransformRecordUniform.java | 66 ++++ .../TestCustomJoltTransform.jar | Bin .../cardrOutput.json | 0 .../cardrOutputSchema.avsc | 0 .../chainrOutput.json | 0 .../chainrOutputSchema.avsc | 0 .../defaultrELOutput.json | 0 .../defaultrELOutputSchema.avsc | 0 .../defaultrOutput.json | 0 .../defaultrOutputSchema.avsc | 0 .../flattenSpec.json | 0 .../flattenedOutput.json | 0 .../input.json | 0 .../inputSchema.avsc | 0 .../modifierDefaultOutput.json | 0 .../modifierDefineOutput.json | 0 .../modifierDefineOutputSchema.avsc | 0 .../modifierOverwriteOutput.json | 0 .../modifierOverwriteOutputSchema.avsc | 0 .../multipleChainrOutput.json | 0 .../multipleChainrOutputSchema.avsc | 0 .../multipleChainrSpec.json | 0 .../multipleToMultipleChainrOutput.json | 0 .../multipleToMultipleChainrOutputSchema.avsc | 0 .../multipleToMultipleChainrSpec.json | 0 .../removrOutput.json | 0 .../removrOutputSchema.avsc | 0 .../shiftrOutput.json | 0 .../shiftrOutputMultipleOutputRecords.json | 0 .../shiftrOutputSchema.avsc | 0 ...iftrOutputSchemaMultipleOutputRecords.avsc | 0 .../shiftrSpecMultipleOutputRecords.json | 0 .../sortrOutput.json | 0 .../sortrOutputSchema.avsc | 0 .../multipleSchemasInput.json | 8 + .../multipleSchemasOutput1.json | 3 + .../multipleSchemasOutput2.json | 5 + .../multipleSchemasSpec.json | 13 + 42 files changed, 476 insertions(+), 236 deletions(-) create mode 100644 nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformWritingStrategy.java rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/{TestJoltTransformRecord.java => TestBaseJoltTransformRecord.java} (83%) create mode 100644 nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordPartitioned.java create mode 100644 nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordUniform.java rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/TestCustomJoltTransform.jar (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/cardrOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/cardrOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/chainrOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/chainrOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/defaultrELOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/defaultrELOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/defaultrOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/defaultrOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/flattenSpec.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/flattenedOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/input.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/inputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/modifierDefaultOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/modifierDefineOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/modifierDefineOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/modifierOverwriteOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/modifierOverwriteOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/multipleChainrOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/multipleChainrOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/multipleChainrSpec.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/multipleToMultipleChainrOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/multipleToMultipleChainrOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/multipleToMultipleChainrSpec.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/removrOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/removrOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/shiftrOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/shiftrOutputMultipleOutputRecords.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/shiftrOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/shiftrOutputSchemaMultipleOutputRecords.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/shiftrSpecMultipleOutputRecords.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/sortrOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecord => TestBaseJoltTransformRecord}/sortrOutputSchema.avsc (100%) create mode 100644 nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecordPartitioned/multipleSchemasInput.json create mode 100644 nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecordPartitioned/multipleSchemasOutput1.json create mode 100644 nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecordPartitioned/multipleSchemasOutput2.json create mode 100644 nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecordPartitioned/multipleSchemasSpec.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java index 3e51258b3ea5..270e5a10aa03 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java @@ -47,7 +47,6 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.util.StopWatch; -import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; @@ -66,58 +65,37 @@ @SupportsBatching @Tags({"record", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr", "cardinality", "sort"}) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -@WritesAttributes({ - @WritesAttribute(attribute = "record.count", description = "The number of records in an outgoing FlowFile"), - @WritesAttribute(attribute = "mime.type", description = "The MIME Type that the configured Record Writer indicates is appropriate"), -}) -@CapabilityDescription("Applies a JOLT specification to each record in the FlowFile payload. A new FlowFile is created " - + "with transformed content and is routed to the 'success' relationship. If the transform " - + "fails, the original FlowFile is routed to the 'failure' relationship.") +@WritesAttributes({@WritesAttribute(attribute = "record.count", description = "The number of records in an outgoing FlowFile"), + @WritesAttribute(attribute = "mime.type", description = "The MIME " + "Type that the configured Record Writer indicates is appropriate")}) +@CapabilityDescription("Applies a JOLT specification to each record in the FlowFile payload. A new FlowFile is created " + "with transformed content and is routed to the 'success' relationship. If " + + "the transform " + "fails, the original FlowFile is routed to the 'failure' relationship.") @RequiresInstanceClassLoading public class JoltTransformRecord extends AbstractJoltTransform { - static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() - .name("Record Reader") - .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.") - .identifiesControllerService(RecordReaderFactory.class) - .required(true) - .build(); - - static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() - .name("Record Writer") - .description("Specifies the Controller Service to use for writing out the records") - .identifiesControllerService(RecordSetWriterFactory.class) - .required(true) - .build(); - - static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("The FlowFile with transformed content will be routed to this relationship") - .build(); - - static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("If a FlowFile fails processing for any reason (for example, the FlowFile records cannot be parsed), it will be routed to this relationship") - .build(); - - static final Relationship REL_ORIGINAL = new Relationship.Builder() - .name("original") - .description("The original FlowFile that was transformed. If the FlowFile fails processing, nothing will be sent to this relationship") - .build(); - - private static final List PROPERTY_DESCRIPTORS = Stream.concat( - getCommonPropertyDescriptors().stream(), - Stream.of( - RECORD_READER, - RECORD_WRITER - ) - ).toList(); - - private static final Set RELATIONSHIPS = Set.of( - REL_SUCCESS, - REL_FAILURE, - REL_ORIGINAL - ); + static final PropertyDescriptor RECORD_READER = + new PropertyDescriptor.Builder().name("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.") + .identifiesControllerService(RecordReaderFactory.class).required(true).build(); + + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("Record Writer").description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class).required(true).build(); + + static final PropertyDescriptor SCHEMA_WRITING_STRATEGY = + new PropertyDescriptor.Builder().name("Schema Writing Strategy").description("Specifies how the processor should handle records that result in different schemas after transformation.") + .allowableValues(JoltTransformWritingStrategy.class).defaultValue(JoltTransformWritingStrategy.USE_FIRST_SCHEMA).required(true).build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("The FlowFile with transformed content will be routed to this relationship").build(); + + static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile records cannot be " + "parsed), it will be routed to this relationship").build(); + + static final Relationship REL_ORIGINAL = + new Relationship.Builder().name("original").description("The original FlowFile that was transformed. If the FlowFile fails processing, nothing will be " + "sent to this relationship") + .build(); + + private static final List PROPERTY_DESCRIPTORS = + Stream.concat(getCommonPropertyDescriptors().stream(), Stream.of(SCHEMA_WRITING_STRATEGY, RECORD_READER, RECORD_WRITER)).toList(); + + private static final Set RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE, REL_ORIGINAL); @Override public Set getRelationships() { @@ -131,6 +109,16 @@ protected List getSupportedPropertyDescriptors() { @Override public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + final String strategy = context.getProperty(SCHEMA_WRITING_STRATEGY).getValue(); + + if (strategy.equals(JoltTransformWritingStrategy.PARTITION_BY_SCHEMA.getValue())) { + processPartitioned(context, session); + } else { + processUniform(context, session); + } + } + + private void processPartitioned(final ProcessContext context, final ProcessSession session) { final FlowFile original = session.get(); if (original == null) { return; @@ -142,96 +130,187 @@ public void onTrigger(final ProcessContext context, ProcessSession session) thro final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - final RecordSchema schema; - FlowFile transformed = null; + // Maps to track resources per Schema + final Map flowFileMap = new HashMap<>(); + final Map streamMap = new HashMap<>(); + final Map writerMap = new HashMap<>(); + final Map recordCounts = new HashMap<>(); + final Map writeResults = new HashMap<>(); - try (final InputStream in = session.read(original); - final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { - schema = writerFactory.getSchema(original.getAttributes(), reader.getSchema()); + boolean error = false; - final Map attributes = new HashMap<>(); - final WriteResult writeResult; - transformed = session.create(original); + try (final InputStream in = session.read(original); final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { - // We want to transform the first record before creating the Record Writer. We do this because the Record will likely end up with a different structure - // and therefore a difference Schema after being transformed. As a result, we want to transform the Record and then provide the transformed schema to the - // Record Writer so that if the Record Writer chooses to inherit the Record Schema from the Record itself, it will inherit the transformed schema, not the - // schema determined by the Record Reader. - final Record firstRecord = reader.nextRecord(); - if (firstRecord == null) { - try (final OutputStream out = session.write(transformed); - final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, transformed)) { + final JoltTransform transform = getTransform(context, original); + Record currentRecord; - writer.beginRecordSet(); - writeResult = writer.finishRecordSet(); + while ((currentRecord = reader.nextRecord()) != null) { + final List transformedRecords = transform(currentRecord, transform); - attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); - attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); - attributes.putAll(writeResult.getAttributes()); + if (transformedRecords.isEmpty()) { + continue; } - transformed = session.putAllAttributes(transformed, attributes); - logger.info("{} had no Records to transform", original); - } else { + for (Record transformedRecord : transformedRecords) { + if (transformedRecord == null) { + continue; + } + + final RecordSchema recordSchema = transformedRecord.getSchema(); + final RecordSchema writeSchema = writerFactory.getSchema(original.getAttributes(), recordSchema); + + RecordSetWriter writer = writerMap.get(writeSchema); - final JoltTransform transform = getTransform(context, original); - final List transformedFirstRecords = transform(firstRecord, transform); + if (writer == null) { + FlowFile newFlowFile = session.create(original); + OutputStream out = session.write(newFlowFile); + writer = writerFactory.createWriter(getLogger(), writeSchema, out, newFlowFile); + + writer.beginRecordSet(); + + flowFileMap.put(writeSchema, newFlowFile); + streamMap.put(writeSchema, out); + writerMap.put(writeSchema, writer); + recordCounts.put(writeSchema, 0); + } + + writer.write(transformedRecord); + recordCounts.put(writeSchema, recordCounts.get(writeSchema) + 1); + } + } + } catch (final Exception e) { + error = true; + logger.error("Transform failed for {}", original, e); + } finally { + // Clean up resources + for (Map.Entry entry : writerMap.entrySet()) { + try { + final RecordSetWriter writer = entry.getValue(); + writeResults.put(entry.getKey(), writer.finishRecordSet()); + writer.close(); + } catch (Exception e) { + getLogger().warn("Failed to close Writer", e); + } + } + for (OutputStream out : streamMap.values()) { + try { + out.close(); + } catch (Exception e) { + getLogger().warn("Failed to close OutputStream", e); + } + } + } + + if (error) { + for (FlowFile flowFile : flowFileMap.values()) { + session.remove(flowFile); + } + session.transfer(original, REL_FAILURE); + } else { + if (writerMap.isEmpty()) { + logger.info("{} had no Records to transform (all filtered)", original); + } else { + final String transformType = context.getProperty(JOLT_TRANSFORM).getValue(); + for (Map.Entry entry : writerMap.entrySet()) { + RecordSchema schemaKey = entry.getKey(); + RecordSetWriter writer = entry.getValue(); + FlowFile flowFile = flowFileMap.get(schemaKey); + int count = recordCounts.get(schemaKey); + WriteResult writeResult = writeResults.get(schemaKey); + + Map attributes = new HashMap<>(writeResult.getAttributes()); + attributes.put("record.count", String.valueOf(count)); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); - if (transformedFirstRecords.isEmpty()) { - throw new ProcessException("Error transforming the first record"); + flowFile = session.putAllAttributes(flowFile, attributes); + session.getProvenanceReporter().modifyContent(flowFile, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); } + } + session.transfer(original, REL_ORIGINAL); + } + } + + private void processUniform(final ProcessContext context, final ProcessSession session) { + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final ComponentLog logger = getLogger(); + final StopWatch stopWatch = new StopWatch(true); + + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + + FlowFile transformed = null; - final Record transformedFirstRecord = transformedFirstRecords.getFirst(); - if (transformedFirstRecord == null) { - throw new ProcessException("Error transforming the first record"); + try (final InputStream in = session.read(original); final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { + + final JoltTransform transform = getTransform(context, original); + + // We need to find the first VALID record to determine the schema for the writer + Record firstValidRecord = null; + List firstValidBatch = null; + Record currentRecord; + + while ((currentRecord = reader.nextRecord()) != null) { + List transformedRecords = transform(currentRecord, transform); + if (transformedRecords != null && !transformedRecords.isEmpty() && transformedRecords.getFirst() != null) { + firstValidBatch = transformedRecords; + firstValidRecord = transformedRecords.getFirst(); + break; } - final RecordSchema writeSchema = writerFactory.getSchema(original.getAttributes(), transformedFirstRecord.getSchema()); + } - // TODO: Is it possible that two Records with the same input schema could have different schemas after transformation? - // If so, then we need to avoid this pattern of writing all Records from the input FlowFile to the same output FlowFile - // and instead use a Map. This way, even if many different output schemas are possible, - // the output FlowFiles will each only contain records that have the same schema. - try (final OutputStream out = session.write(transformed); - final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, transformed)) { + transformed = session.create(original); + final WriteResult writeResult; + final Map attributes = new HashMap<>(); + + if (firstValidRecord == null) { + // UPDATED LOGIC: + // All records were filtered out (or input was empty). + // The test expects 0 output files. We must remove the FlowFile we created + // and ensure 'transformed' is null so it isn't transferred to SUCCESS later. + session.remove(transformed); + transformed = null; + logger.info("{} had no Records to transform", original); + } else { + // We have at least one valid record, initialize writer with its schema + final RecordSchema writeSchema = writerFactory.getSchema(original.getAttributes(), firstValidRecord.getSchema()); + + try (final OutputStream out = session.write(transformed); final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, transformed)) { writer.beginRecordSet(); - writer.write(transformedFirstRecord); - Record record; - // If multiple output records were generated, write them out - for (int i = 1; i < transformedFirstRecords.size(); i++) { - record = transformedFirstRecords.get(i); - if (record == null) { - throw new ProcessException("Error transforming the first record"); - } - writer.write(record); + // Write the first batch found + for (Record r : firstValidBatch) { + if (r != null) writer.write(r); } - while ((record = reader.nextRecord()) != null) { - final List transformedRecords = transform(record, transform); - for (Record transformedRecord : transformedRecords) { - writer.write(transformedRecord); + // Write the rest + while ((currentRecord = reader.nextRecord()) != null) { + final List transformedRecords = transform(currentRecord, transform); + if (transformedRecords != null && !transformedRecords.isEmpty()) { + for (Record r : transformedRecords) { + if (r != null) writer.write(r); + } } } writeResult = writer.finishRecordSet(); - - try { - writer.close(); - } catch (final IOException ioe) { - getLogger().warn("Failed to close Writer for {}", transformed); - } - attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); attributes.putAll(writeResult.getAttributes()); } final String transformType = context.getProperty(JOLT_TRANSFORM).getValue(); - transformed = session.putAllAttributes(transformed, attributes); session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - logger.debug("Transform completed {}", original); + + // Only apply attributes if we actually wrote something + transformed = session.putAllAttributes(transformed, attributes); } + } catch (final Exception e) { logger.error("Transform failed for {}", original, e); session.transfer(original, REL_FAILURE); @@ -240,6 +319,7 @@ record = transformedFirstRecords.get(i); } return; } + if (transformed != null) { session.transfer(transformed, REL_SUCCESS); } @@ -248,8 +328,8 @@ record = transformedFirstRecords.get(i); @Override public void migrateProperties(PropertyConfiguration config) { - config.renameProperty("jolt-record-record-reader", RECORD_READER.getName()); - config.renameProperty("jolt-record-record-writer", RECORD_WRITER.getName()); + config.renameProperty("jolt-record-record-reader", RECORD_READER.getName()); + config.renameProperty("jolt-record-record-writer", RECORD_WRITER.getName()); } private List transform(final Record record, final JoltTransform transform) { @@ -332,4 +412,4 @@ protected static Object normalizeRecordObjects(final Object o) { return o; } } -} +} \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformWritingStrategy.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformWritingStrategy.java new file mode 100644 index 000000000000..df1523a90eb1 --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformWritingStrategy.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.jolt; + +import org.apache.nifi.components.DescribedValue; + +public enum JoltTransformWritingStrategy implements DescribedValue { + USE_FIRST_SCHEMA("The first successful transformation determines the schema for the entire FlowFile. All subsequent records must adhere to this schema or they may be invalid."), + PARTITION_BY_SCHEMA("Each unique schema generated by the transformation results in a separate output FlowFile. Useful when Jolt produces variable output structures."); + + private final String description; + + JoltTransformWritingStrategy(final String description) { + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return name(); + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecord.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java similarity index 83% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecord.java rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java index 308bb9dd4213..3b548e26ac91 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecord.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java @@ -19,10 +19,8 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.jolt.util.JoltTransformStrategy; import org.apache.nifi.json.JsonRecordSetWriter; -import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.processor.Relationship; import org.apache.nifi.schema.access.SchemaAccessUtils; -import org.apache.nifi.schema.inference.SchemaInferenceUtil; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MockRecordParser; @@ -62,14 +60,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @DisabledOnOs(OS.WINDOWS) //The pretty printed json comparisons don't work on windows -public class TestJoltTransformRecord { +public abstract class TestBaseJoltTransformRecord { final static String CHAINR_SPEC_PATH = "src/test/resources/specs/chainrSpec.json"; static String chainrSpecContents; - private TestRunner runner; - private JoltTransformRecord processor; - private MockRecordParser parser; - private JsonRecordSetWriter writer; + protected TestRunner runner; + protected JoltTransformRecord processor; + protected MockRecordParser parser; + protected JsonRecordSetWriter writer; @BeforeAll static void setUpBeforeAll() throws Exception { @@ -88,13 +86,16 @@ public void setup() throws Exception { runner.addControllerService("writer", writer); runner.setProperty(writer, "Schema Write Strategy", "full-schema-attribute"); runner.setProperty(JoltTransformRecord.RECORD_WRITER, "writer"); + runner.setProperty(JoltTransformRecord.SCHEMA_WRITING_STRATEGY, getWritingStrategy()); // Each test must set the Schema Access strategy and Schema, and enable the writer CS } + protected abstract String getWritingStrategy(); + @Test public void testRelationshipsCreated() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -111,7 +112,7 @@ public void testRelationshipsCreated() throws IOException { @Test public void testRelationshipsCreatedFromFile() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -128,7 +129,7 @@ public void testRelationshipsCreatedFromFile() throws IOException { @Test public void testInvalidJOLTSpec() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -157,7 +158,7 @@ public void testSpecIsNotSet() { @Test public void testSpecIsEmpty() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -170,7 +171,7 @@ public void testSpecIsEmpty() throws IOException { @Test public void testSpecNotRequired() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -182,7 +183,7 @@ public void testSpecNotRequired() throws IOException { @Test public void testNoFlowFileContent() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -197,7 +198,7 @@ public void testNoFlowFileContent() throws IOException { @Test public void testNoRecords() throws IOException { generateTestData(0, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -207,7 +208,7 @@ public void testNoRecords() throws IOException { runner.run(); runner.assertQueueEmpty(); runner.assertTransferCount(JoltTransformRecord.REL_FAILURE, 0); - runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 0); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); } @@ -215,7 +216,7 @@ public void testNoRecords() throws IOException { public void testInvalidFlowFileContent() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); @@ -233,7 +234,7 @@ public void testInvalidFlowFileContent() throws IOException { @Test public void testCustomTransformationWithNoModule() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -248,12 +249,12 @@ public void testCustomTransformationWithNoModule() throws IOException { @Test public void testCustomTransformationWithMissingClassName() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); runner.enableControllerService(writer); - final String customJarPath = "src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar"; + final String customJarPath = "src/test/resources/TestBaseJoltTransformRecord/TestCustomJoltTransform.jar"; runner.setProperty(JoltTransformRecord.JOLT_SPEC, chainrSpecContents); runner.setProperty(JoltTransformRecord.MODULES, customJarPath); runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformStrategy.CUSTOMR); @@ -263,7 +264,7 @@ public void testCustomTransformationWithMissingClassName() throws IOException { @Test public void testCustomTransformationWithInvalidClassPath() { - final String customJarPath = "src/test/resources/TestJoltTransformRecord/FakeCustomJar.jar"; + final String customJarPath = "src/test/resources/TestBaseJoltTransformRecord/FakeCustomJar.jar"; runner.setProperty(JoltTransformRecord.JOLT_SPEC, chainrSpecContents); runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "TestCustomJoltTransform"); runner.setProperty(JoltTransformRecord.MODULES, customJarPath); @@ -274,7 +275,7 @@ public void testCustomTransformationWithInvalidClassPath() { @Test public void testCustomTransformationWithInvalidClassName() { - final String customJarPath = "src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar"; + final String customJarPath = "src/test/resources/TestBaseJoltTransformRecord/TestCustomJoltTransform.jar"; runner.setProperty(JoltTransformRecord.JOLT_SPEC, chainrSpecContents); runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "FakeCustomJoltTransform"); runner.setProperty(JoltTransformRecord.MODULES, customJarPath); @@ -287,7 +288,7 @@ public void testCustomTransformationWithInvalidClassName() { @MethodSource("getChainrArguments") public void testTransformInputWithChainr(Path specPath, String ignoredDescription) throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -301,14 +302,13 @@ public void testTransformInputWithChainr(Path specPath, String ignoredDescriptio final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithShiftr() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -323,14 +323,13 @@ public void testTransformInputWithShiftr() throws IOException { final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithShiftrAccentedChars() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -343,10 +342,9 @@ public void testTransformInputWithShiftrAccentedChars() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - transformed.assertAttributeExists(CoreAttributes.MIME_TYPE .key()); + transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutput.json")), new String(transformed.toByteArray())); } @Test @@ -362,20 +360,20 @@ public void testTransformInputWithShiftrMultipleOutputRecords() throws IOExcepti final Record record1 = new MapRecord(xSchema, Map.of("a", 1, "b", 2, "c", 3)); final Record record2 = new MapRecord(xSchema, Map.of("a", 11, "b", 21, "c", 31)); final Record record3 = new MapRecord(xSchema, Map.of("a", 21, "b", 2, "c", 3)); - final Object[] recordArray1 = new Object[]{record1, record2, record3}; + final Object[] recordArray1 = new Object[] {record1, record2, record3}; parser.addRecord((Object) recordArray1); final Record record4 = new MapRecord(xSchema, Map.of("a", 100, "b", 200, "c", 300)); final Record record5 = new MapRecord(xSchema, Map.of("a", 101, "b", 201, "c", 301)); - final Object[] recordArray2 = new Object[]{record4, record5}; + final Object[] recordArray2 = new Object[] {record4, record5}; parser.addRecord((Object) recordArray2); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchemaMultipleOutputRecords.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchemaMultipleOutputRecords.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); runner.enableControllerService(writer); - final String spec = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrSpecMultipleOutputRecords.json")); + final String spec = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrSpecMultipleOutputRecords.json")); runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformStrategy.SHIFTR); runner.enqueue(new byte[0]); @@ -385,14 +383,13 @@ public void testTransformInputWithShiftrMultipleOutputRecords() throws IOExcepti final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputMultipleOutputRecords.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutputMultipleOutputRecords.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithShiftrFromFile() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -407,14 +404,13 @@ public void testTransformInputWithShiftrFromFile() throws IOException { final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithDefaultr() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -427,14 +423,13 @@ public void testTransformInputWithDefaultr() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithRemovr() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/removrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/removrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -447,15 +442,14 @@ public void testTransformInputWithRemovr() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/removrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/removrOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithCardinality() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/cardrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/cardrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -468,15 +462,14 @@ public void testTransformInputWithCardinality() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/cardrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/cardrOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithSortr() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/sortrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -489,14 +482,13 @@ public void testTransformInputWithSortr() throws IOException { final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/sortrOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithDefaultrExpressionLanguage() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrELOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrELOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -510,8 +502,7 @@ public void testTransformInputWithDefaultrExpressionLanguage() throws IOExceptio runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrELOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrELOutput.json")), new String(transformed.toByteArray())); } @@ -519,7 +510,7 @@ public void testTransformInputWithDefaultrExpressionLanguage() throws IOExceptio public void testTransformInputWithModifierDefault() throws IOException { generateTestData(1, null); // Input schema = output schema, just modifying values - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/inputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/inputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -531,14 +522,13 @@ public void testTransformInputWithModifierDefault() throws IOException { runner.run(); runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefaultOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/modifierDefaultOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithModifierDefine() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefineOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/modifierDefineOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -551,14 +541,13 @@ public void testTransformInputWithModifierDefine() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/modifierDefineOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithModifierOverwrite() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/modifierOverwriteOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/modifierOverwriteOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -571,14 +560,13 @@ public void testTransformInputWithModifierOverwrite() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/modifierOverwriteOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithSortrPopulatedSpec() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/sortrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -592,19 +580,18 @@ public void testTransformInputWithSortrPopulatedSpec() throws IOException { final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/sortrOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputCustomTransformationIgnored() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); runner.enableControllerService(writer); - final String customJarPath = "src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar"; + final String customJarPath = "src/test/resources/TestBaseJoltTransformRecord/TestCustomJoltTransform.jar"; final String spec = Files.readString(Paths.get("src/test/resources/specs/defaultrSpec.json")); runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "TestCustomJoltTransform"); @@ -617,19 +604,18 @@ public void testTransformInputCustomTransformationIgnored() throws IOException { final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrOutput.json")), new String(transformed.toByteArray())); } @Test public void testExpressionLanguageJarFile() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); runner.enableControllerService(writer); - URL t = getClass().getResource("/TestJoltTransformRecord/TestCustomJoltTransform.jar"); + URL t = getClass().getResource("/TestBaseJoltTransformRecord/TestCustomJoltTransform.jar"); assertNotNull(t); final String customJarPath = t.getPath(); final String spec = Files.readString(Paths.get("src/test/resources/specs/customChainrSpec.json")); @@ -651,7 +637,7 @@ public void testExpressionLanguageJarFile() throws IOException { @Test public void testJoltSpecEL() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); @@ -661,8 +647,7 @@ public void testJoltSpecEL() throws IOException { runner.setProperty(JoltTransformRecord.JOLT_SPEC, "${joltSpec}"); runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformStrategy.DEFAULTR); - final Map attributes = Collections.singletonMap("joltSpec", - "{\"RatingRange\":5,\"rating\":{\"*\":{\"MaxLabel\":\"High\",\"MinLabel\":\"Low\",\"DisplayType\":\"NORMAL\"}}}"); + final Map attributes = Collections.singletonMap("joltSpec", "{\"RatingRange\":5,\"rating\":{\"*\":{\"MaxLabel\":\"High\",\"MinLabel\":\"Low\",\"DisplayType\":\"NORMAL\"}}}"); runner.enqueue(new byte[0], attributes); runner.run(); @@ -670,8 +655,7 @@ public void testJoltSpecEL() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json")), - new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrOutput.json")), new String(transformed.toByteArray())); } @Test @@ -683,63 +667,29 @@ public void testJoltSpecInvalidEL() { runner.assertNotValid(); } - @Test - public void testJoltComplexChoiceField() throws Exception { - final JsonTreeReader reader = new JsonTreeReader(); - runner.addControllerService("reader", reader); - runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA); - runner.enableControllerService(reader); - runner.setProperty(JoltTransformRecord.RECORD_READER, "reader"); - - runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); - runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); - runner.enableControllerService(writer); - - final String flattenSpec = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/flattenSpec.json")); - runner.setProperty(JoltTransformRecord.JOLT_SPEC, flattenSpec); - runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformStrategy.CHAINR); - - final String inputJson = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/input.json")); - runner.enqueue(inputJson); - - runner.run(); - runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); - runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); - - final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/flattenedOutput.json")), - new String(transformed.toByteArray())); - } private static Stream getChainrArguments() { - return Stream.of( - Arguments.of(Paths.get(CHAINR_SPEC_PATH), "has no single line comments"), + return Stream.of(Arguments.of(Paths.get(CHAINR_SPEC_PATH), "has no single line comments"), Arguments.of(Paths.get("src/test/resources/specs/chainrSpecWithSingleLineComment.json"), "has a single line comment")); } - private void generateTestData(int numRecords, final BiFunction recordGenerator) { + protected void generateTestData(int numRecords, final BiFunction recordGenerator) { if (recordGenerator == null) { - final RecordSchema primarySchema = new SimpleRecordSchema(List.of( - new RecordField("value", RecordFieldType.INT.getDataType()))); - final RecordSchema seriesSchema = new SimpleRecordSchema(List.of( - new RecordField("value", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())))); - final RecordSchema qualitySchema = new SimpleRecordSchema(List.of( - new RecordField("value", RecordFieldType.INT.getDataType()))); - final RecordSchema ratingSchema = new SimpleRecordSchema(Arrays.asList( - new RecordField("primary", RecordFieldType.RECORD.getDataType()), - new RecordField("series", RecordFieldType.RECORD.getDataType()), - new RecordField("quality", RecordFieldType.RECORD.getDataType()) - )); + final RecordSchema primarySchema = new SimpleRecordSchema(List.of(new RecordField("value", RecordFieldType.INT.getDataType()))); + final RecordSchema seriesSchema = new SimpleRecordSchema(List.of(new RecordField("value", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())))); + final RecordSchema qualitySchema = new SimpleRecordSchema(List.of(new RecordField("value", RecordFieldType.INT.getDataType()))); + final RecordSchema ratingSchema = new SimpleRecordSchema( + Arrays.asList(new RecordField("primary", RecordFieldType.RECORD.getDataType()), new RecordField("series", RecordFieldType.RECORD.getDataType()), + new RecordField("quality", RecordFieldType.RECORD.getDataType()))); parser.addSchemaField("rating", RecordFieldType.RECORD); for (int i = 0; i < numRecords; i++) { final Record primaryRecord = new MapRecord(primarySchema, Map.of("value", (10 * i) + 3)); - final Record seriesRecord = new MapRecord(seriesSchema, Map.of("value", new Integer[]{(10 * i) + 5, (10 * i) + 4})); + final Record seriesRecord = new MapRecord(seriesSchema, Map.of("value", new Integer[] {(10 * i) + 5, (10 * i) + 4})); final Record qualityRecord = new MapRecord(qualitySchema, Map.of("value", 3)); - Record ratingRecord = new MapRecord(ratingSchema, Map.of("primary", primaryRecord, - "series", seriesRecord, "quality", qualityRecord)); + Record ratingRecord = new MapRecord(ratingSchema, Map.of("primary", primaryRecord, "series", seriesRecord, "quality", qualityRecord)); parser.addRecord(ratingRecord); } @@ -747,4 +697,4 @@ private void generateTestData(int numRecords, final BiFunction Date: Wed, 19 Nov 2025 14:34:15 +0100 Subject: [PATCH 2/8] Refactored test files back into original folder --- .../jolt/TestBaseJoltTransformRecord.java | 98 +++++++++--------- .../TestJoltTransformRecordPartitioned.java | 8 +- .../jolt/TestJoltTransformRecordUniform.java | 6 +- .../TestCustomJoltTransform.jar | Bin .../cardrOutput.json | 0 .../cardrOutputSchema.avsc | 0 .../chainrOutput.json | 0 .../chainrOutputSchema.avsc | 0 .../defaultrELOutput.json | 0 .../defaultrELOutputSchema.avsc | 0 .../defaultrOutput.json | 0 .../defaultrOutputSchema.avsc | 0 .../flattenSpec.json | 0 .../flattenedOutput.json | 0 .../input.json | 0 .../inputSchema.avsc | 0 .../modifierDefaultOutput.json | 0 .../modifierDefineOutput.json | 0 .../modifierDefineOutputSchema.avsc | 0 .../modifierOverwriteOutput.json | 0 .../modifierOverwriteOutputSchema.avsc | 0 .../multipleChainrOutput.json | 0 .../multipleChainrOutputSchema.avsc | 0 .../multipleChainrSpec.json | 0 .../multipleSchemasInput.json | 0 .../multipleSchemasOutput1.json | 0 .../multipleSchemasOutput2.json | 0 .../multipleSchemasSpec.json | 0 .../multipleToMultipleChainrOutput.json | 0 .../multipleToMultipleChainrOutputSchema.avsc | 0 .../multipleToMultipleChainrSpec.json | 0 .../removrOutput.json | 0 .../removrOutputSchema.avsc | 0 .../shiftrOutput.json | 0 .../shiftrOutputMultipleOutputRecords.json | 0 .../shiftrOutputSchema.avsc | 0 ...iftrOutputSchemaMultipleOutputRecords.avsc | 0 .../shiftrSpecMultipleOutputRecords.json | 0 .../sortrOutput.json | 0 .../sortrOutputSchema.avsc | 0 40 files changed, 56 insertions(+), 56 deletions(-) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/TestCustomJoltTransform.jar (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/cardrOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/cardrOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/chainrOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/chainrOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/defaultrELOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/defaultrELOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/defaultrOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/defaultrOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/flattenSpec.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/flattenedOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/input.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/inputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/modifierDefaultOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/modifierDefineOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/modifierDefineOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/modifierOverwriteOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/modifierOverwriteOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/multipleChainrOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/multipleChainrOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/multipleChainrSpec.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecordPartitioned => TestJoltTransformRecord}/multipleSchemasInput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecordPartitioned => TestJoltTransformRecord}/multipleSchemasOutput1.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecordPartitioned => TestJoltTransformRecord}/multipleSchemasOutput2.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestJoltTransformRecordPartitioned => TestJoltTransformRecord}/multipleSchemasSpec.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/multipleToMultipleChainrOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/multipleToMultipleChainrOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/multipleToMultipleChainrSpec.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/removrOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/removrOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/shiftrOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/shiftrOutputMultipleOutputRecords.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/shiftrOutputSchema.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/shiftrOutputSchemaMultipleOutputRecords.avsc (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/shiftrSpecMultipleOutputRecords.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/sortrOutput.json (100%) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/{TestBaseJoltTransformRecord => TestJoltTransformRecord}/sortrOutputSchema.avsc (100%) diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java index 3b548e26ac91..950566dc86a8 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java @@ -95,7 +95,7 @@ public void setup() throws Exception { @Test public void testRelationshipsCreated() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -112,7 +112,7 @@ public void testRelationshipsCreated() throws IOException { @Test public void testRelationshipsCreatedFromFile() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -129,7 +129,7 @@ public void testRelationshipsCreatedFromFile() throws IOException { @Test public void testInvalidJOLTSpec() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -158,7 +158,7 @@ public void testSpecIsNotSet() { @Test public void testSpecIsEmpty() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -171,7 +171,7 @@ public void testSpecIsEmpty() throws IOException { @Test public void testSpecNotRequired() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -183,7 +183,7 @@ public void testSpecNotRequired() throws IOException { @Test public void testNoFlowFileContent() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -198,7 +198,7 @@ public void testNoFlowFileContent() throws IOException { @Test public void testNoRecords() throws IOException { generateTestData(0, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -216,7 +216,7 @@ public void testNoRecords() throws IOException { public void testInvalidFlowFileContent() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); @@ -234,7 +234,7 @@ public void testInvalidFlowFileContent() throws IOException { @Test public void testCustomTransformationWithNoModule() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -249,12 +249,12 @@ public void testCustomTransformationWithNoModule() throws IOException { @Test public void testCustomTransformationWithMissingClassName() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); runner.enableControllerService(writer); - final String customJarPath = "src/test/resources/TestBaseJoltTransformRecord/TestCustomJoltTransform.jar"; + final String customJarPath = "src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar"; runner.setProperty(JoltTransformRecord.JOLT_SPEC, chainrSpecContents); runner.setProperty(JoltTransformRecord.MODULES, customJarPath); runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformStrategy.CUSTOMR); @@ -264,7 +264,7 @@ public void testCustomTransformationWithMissingClassName() throws IOException { @Test public void testCustomTransformationWithInvalidClassPath() { - final String customJarPath = "src/test/resources/TestBaseJoltTransformRecord/FakeCustomJar.jar"; + final String customJarPath = "src/test/resources/TestJoltTransformRecord/FakeCustomJar.jar"; runner.setProperty(JoltTransformRecord.JOLT_SPEC, chainrSpecContents); runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "TestCustomJoltTransform"); runner.setProperty(JoltTransformRecord.MODULES, customJarPath); @@ -275,7 +275,7 @@ public void testCustomTransformationWithInvalidClassPath() { @Test public void testCustomTransformationWithInvalidClassName() { - final String customJarPath = "src/test/resources/TestBaseJoltTransformRecord/TestCustomJoltTransform.jar"; + final String customJarPath = "src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar"; runner.setProperty(JoltTransformRecord.JOLT_SPEC, chainrSpecContents); runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "FakeCustomJoltTransform"); runner.setProperty(JoltTransformRecord.MODULES, customJarPath); @@ -288,7 +288,7 @@ public void testCustomTransformationWithInvalidClassName() { @MethodSource("getChainrArguments") public void testTransformInputWithChainr(Path specPath, String ignoredDescription) throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -302,13 +302,13 @@ public void testTransformInputWithChainr(Path specPath, String ignoredDescriptio final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/chainrOutput.json")), new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithShiftr() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -323,13 +323,13 @@ public void testTransformInputWithShiftr() throws IOException { final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutput.json")), new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithShiftrAccentedChars() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -344,7 +344,7 @@ public void testTransformInputWithShiftrAccentedChars() throws IOException { final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutput.json")), new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json")), new String(transformed.toByteArray())); } @Test @@ -368,12 +368,12 @@ public void testTransformInputWithShiftrMultipleOutputRecords() throws IOExcepti final Object[] recordArray2 = new Object[] {record4, record5}; parser.addRecord((Object) recordArray2); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchemaMultipleOutputRecords.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchemaMultipleOutputRecords.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); runner.enableControllerService(writer); - final String spec = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrSpecMultipleOutputRecords.json")); + final String spec = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrSpecMultipleOutputRecords.json")); runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformStrategy.SHIFTR); runner.enqueue(new byte[0]); @@ -383,13 +383,13 @@ public void testTransformInputWithShiftrMultipleOutputRecords() throws IOExcepti final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutputMultipleOutputRecords.json")), new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputMultipleOutputRecords.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithShiftrFromFile() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -404,13 +404,13 @@ public void testTransformInputWithShiftrFromFile() throws IOException { final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/shiftrOutput.json")), new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithDefaultr() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -423,13 +423,13 @@ public void testTransformInputWithDefaultr() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrOutput.json")), new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithRemovr() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/removrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/removrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -442,14 +442,14 @@ public void testTransformInputWithRemovr() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/removrOutput.json")), new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/removrOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithCardinality() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/cardrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/cardrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -462,14 +462,14 @@ public void testTransformInputWithCardinality() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/cardrOutput.json")), new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/cardrOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithSortr() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/sortrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -482,13 +482,13 @@ public void testTransformInputWithSortr() throws IOException { final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/sortrOutput.json")), new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithDefaultrExpressionLanguage() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrELOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrELOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -502,7 +502,7 @@ public void testTransformInputWithDefaultrExpressionLanguage() throws IOExceptio runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrELOutput.json")), new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrELOutput.json")), new String(transformed.toByteArray())); } @@ -510,7 +510,7 @@ public void testTransformInputWithDefaultrExpressionLanguage() throws IOExceptio public void testTransformInputWithModifierDefault() throws IOException { generateTestData(1, null); // Input schema = output schema, just modifying values - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/inputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/inputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -522,13 +522,13 @@ public void testTransformInputWithModifierDefault() throws IOException { runner.run(); runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/modifierDefaultOutput.json")), new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefaultOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithModifierDefine() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/modifierDefineOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefineOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -541,13 +541,13 @@ public void testTransformInputWithModifierDefine() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/modifierDefineOutput.json")), new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithModifierOverwrite() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/modifierOverwriteOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/modifierOverwriteOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -560,13 +560,13 @@ public void testTransformInputWithModifierOverwrite() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/modifierOverwriteOutput.json")), new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputWithSortrPopulatedSpec() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/sortrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); @@ -580,18 +580,18 @@ public void testTransformInputWithSortrPopulatedSpec() throws IOException { final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/sortrOutput.json")), new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutput.json")), new String(transformed.toByteArray())); } @Test public void testTransformInputCustomTransformationIgnored() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); runner.enableControllerService(writer); - final String customJarPath = "src/test/resources/TestBaseJoltTransformRecord/TestCustomJoltTransform.jar"; + final String customJarPath = "src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar"; final String spec = Files.readString(Paths.get("src/test/resources/specs/defaultrSpec.json")); runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "TestCustomJoltTransform"); @@ -604,18 +604,18 @@ public void testTransformInputCustomTransformationIgnored() throws IOException { final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); - assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrOutput.json")), new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json")), new String(transformed.toByteArray())); } @Test public void testExpressionLanguageJarFile() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); runner.enableControllerService(writer); - URL t = getClass().getResource("/TestBaseJoltTransformRecord/TestCustomJoltTransform.jar"); + URL t = getClass().getResource("/TestJoltTransformRecord/TestCustomJoltTransform.jar"); assertNotNull(t); final String customJarPath = t.getPath(); final String spec = Files.readString(Paths.get("src/test/resources/specs/customChainrSpec.json")); @@ -637,7 +637,7 @@ public void testExpressionLanguageJarFile() throws IOException { @Test public void testJoltSpecEL() throws IOException { generateTestData(1, null); - final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrOutputSchema.avsc")); + final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc")); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); @@ -655,7 +655,7 @@ public void testJoltSpecEL() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/defaultrOutput.json")), new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json")), new String(transformed.toByteArray())); } @Test diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordPartitioned.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordPartitioned.java index b355edaafae4..7589184e0952 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordPartitioned.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordPartitioned.java @@ -49,11 +49,11 @@ public void testTransformInputWithDifferentSchemas() throws InitializationExcept runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); runner.enableControllerService(writer); - final String flattenSpec = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecordPartitioned/multipleSchemasSpec.json")); + final String flattenSpec = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/multipleSchemasSpec.json")); runner.setProperty(JoltTransformRecord.JOLT_SPEC, flattenSpec); runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformStrategy.CHAINR); - final String inputJson = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecordPartitioned/multipleSchemasInput.json")); + final String inputJson = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/multipleSchemasInput.json")); runner.enqueue(inputJson); runner.run(); @@ -61,9 +61,9 @@ public void testTransformInputWithDifferentSchemas() throws InitializationExcept runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 2); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecordPartitioned/multipleSchemasOutput1.json")), + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/multipleSchemasOutput1.json")), new String(runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0).toByteArray())); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecordPartitioned/multipleSchemasOutput2.json")), + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/multipleSchemasOutput2.json")), new String(runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(1).toByteArray())); } } \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordUniform.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordUniform.java index 76cab00e4532..1789ce752ef1 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordUniform.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordUniform.java @@ -48,11 +48,11 @@ public void testJoltComplexChoiceField() throws Exception { runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); runner.enableControllerService(writer); - final String flattenSpec = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/flattenSpec.json")); + final String flattenSpec = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/flattenSpec.json")); runner.setProperty(JoltTransformRecord.JOLT_SPEC, flattenSpec); runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformStrategy.CHAINR); - final String inputJson = Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/input.json")); + final String inputJson = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/input.json")); runner.enqueue(inputJson); runner.run(); @@ -61,6 +61,6 @@ public void testJoltComplexChoiceField() throws Exception { runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - assertEquals(Files.readString(Paths.get("src/test/resources/TestBaseJoltTransformRecord/flattenedOutput.json")), new String(transformed.toByteArray())); + assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/flattenedOutput.json")), new String(transformed.toByteArray())); } } \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/TestCustomJoltTransform.jar b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/TestCustomJoltTransform.jar rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/cardrOutput.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/cardrOutput.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/cardrOutput.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/cardrOutput.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/cardrOutputSchema.avsc b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/cardrOutputSchema.avsc similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/cardrOutputSchema.avsc rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/cardrOutputSchema.avsc diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/chainrOutput.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/chainrOutput.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/chainrOutput.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/chainrOutput.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/chainrOutputSchema.avsc rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/defaultrELOutput.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutput.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/defaultrELOutput.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutput.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/defaultrELOutputSchema.avsc b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutputSchema.avsc similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/defaultrELOutputSchema.avsc rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutputSchema.avsc diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/defaultrOutput.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/defaultrOutput.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/defaultrOutput.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/defaultrOutput.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/defaultrOutputSchema.avsc b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/defaultrOutputSchema.avsc rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/flattenSpec.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/flattenSpec.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/flattenSpec.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/flattenSpec.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/flattenedOutput.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/flattenedOutput.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/flattenedOutput.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/flattenedOutput.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/input.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/input.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/input.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/input.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/inputSchema.avsc b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/inputSchema.avsc similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/inputSchema.avsc rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/inputSchema.avsc diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/modifierDefaultOutput.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/modifierDefaultOutput.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/modifierDefaultOutput.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/modifierDefaultOutput.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/modifierDefineOutput.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/modifierDefineOutput.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/modifierDefineOutputSchema.avsc b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutputSchema.avsc similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/modifierDefineOutputSchema.avsc rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutputSchema.avsc diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/modifierOverwriteOutput.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/modifierOverwriteOutput.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/modifierOverwriteOutputSchema.avsc b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutputSchema.avsc similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/modifierOverwriteOutputSchema.avsc rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutputSchema.avsc diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/multipleChainrOutput.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutput.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/multipleChainrOutput.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutput.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/multipleChainrOutputSchema.avsc b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutputSchema.avsc similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/multipleChainrOutputSchema.avsc rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutputSchema.avsc diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/multipleChainrSpec.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleChainrSpec.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/multipleChainrSpec.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleChainrSpec.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecordPartitioned/multipleSchemasInput.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasInput.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecordPartitioned/multipleSchemasInput.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasInput.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecordPartitioned/multipleSchemasOutput1.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasOutput1.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecordPartitioned/multipleSchemasOutput1.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasOutput1.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecordPartitioned/multipleSchemasOutput2.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasOutput2.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecordPartitioned/multipleSchemasOutput2.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasOutput2.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecordPartitioned/multipleSchemasSpec.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasSpec.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecordPartitioned/multipleSchemasSpec.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasSpec.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/multipleToMultipleChainrOutput.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutput.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/multipleToMultipleChainrOutput.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutput.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/multipleToMultipleChainrOutputSchema.avsc b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutputSchema.avsc similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/multipleToMultipleChainrOutputSchema.avsc rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutputSchema.avsc diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/multipleToMultipleChainrSpec.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrSpec.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/multipleToMultipleChainrSpec.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrSpec.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/removrOutput.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/removrOutput.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/removrOutput.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/removrOutput.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/removrOutputSchema.avsc b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/removrOutputSchema.avsc similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/removrOutputSchema.avsc rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/removrOutputSchema.avsc diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/shiftrOutput.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/shiftrOutput.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/shiftrOutput.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/shiftrOutput.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/shiftrOutputMultipleOutputRecords.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/shiftrOutputMultipleOutputRecords.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/shiftrOutputMultipleOutputRecords.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/shiftrOutputMultipleOutputRecords.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchema.avsc b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchema.avsc rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchemaMultipleOutputRecords.avsc b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/shiftrOutputSchemaMultipleOutputRecords.avsc similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/shiftrOutputSchemaMultipleOutputRecords.avsc rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/shiftrOutputSchemaMultipleOutputRecords.avsc diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/shiftrSpecMultipleOutputRecords.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/shiftrSpecMultipleOutputRecords.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/shiftrSpecMultipleOutputRecords.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/shiftrSpecMultipleOutputRecords.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/sortrOutput.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/sortrOutput.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/sortrOutput.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/sortrOutput.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/sortrOutputSchema.avsc b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/sortrOutputSchema.avsc similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestBaseJoltTransformRecord/sortrOutputSchema.avsc rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/sortrOutputSchema.avsc From a9a33e01af77c30e0a0c7900634c1c240c87f8be Mon Sep 17 00:00:00 2001 From: Jordan Sammut Date: Wed, 19 Nov 2025 14:44:13 +0100 Subject: [PATCH 3/8] Reverted some formatting so as to leave PR cleaner --- .../processors/jolt/JoltTransformRecord.java | 96 ++++++++++++------- 1 file changed, 64 insertions(+), 32 deletions(-) diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java index 270e5a10aa03..1ddc359999c5 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java @@ -65,37 +65,67 @@ @SupportsBatching @Tags({"record", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr", "cardinality", "sort"}) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -@WritesAttributes({@WritesAttribute(attribute = "record.count", description = "The number of records in an outgoing FlowFile"), - @WritesAttribute(attribute = "mime.type", description = "The MIME " + "Type that the configured Record Writer indicates is appropriate")}) -@CapabilityDescription("Applies a JOLT specification to each record in the FlowFile payload. A new FlowFile is created " + "with transformed content and is routed to the 'success' relationship. If " + - "the transform " + "fails, the original FlowFile is routed to the 'failure' relationship.") +@WritesAttributes({ + @WritesAttribute(attribute = "record.count", description = "The number of records in an outgoing FlowFile"), + @WritesAttribute(attribute = "mime.type", description = "The MIME Type that the configured Record Writer indicates is appropriate"), +}) +@CapabilityDescription("Applies a JOLT specification to each record in the FlowFile payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the transform " + + "fails, the original FlowFile is routed to the 'failure' relationship.") @RequiresInstanceClassLoading public class JoltTransformRecord extends AbstractJoltTransform { - static final PropertyDescriptor RECORD_READER = - new PropertyDescriptor.Builder().name("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.") - .identifiesControllerService(RecordReaderFactory.class).required(true).build(); - - static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("Record Writer").description("Specifies the Controller Service to use for writing out the records") - .identifiesControllerService(RecordSetWriterFactory.class).required(true).build(); - - static final PropertyDescriptor SCHEMA_WRITING_STRATEGY = - new PropertyDescriptor.Builder().name("Schema Writing Strategy").description("Specifies how the processor should handle records that result in different schemas after transformation.") - .allowableValues(JoltTransformWritingStrategy.class).defaultValue(JoltTransformWritingStrategy.USE_FIRST_SCHEMA).required(true).build(); - - static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("The FlowFile with transformed content will be routed to this relationship").build(); - - static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") - .description("If a FlowFile fails processing for any reason (for example, the FlowFile records cannot be " + "parsed), it will be routed to this relationship").build(); - - static final Relationship REL_ORIGINAL = - new Relationship.Builder().name("original").description("The original FlowFile that was transformed. If the FlowFile fails processing, nothing will be " + "sent to this relationship") - .build(); - - private static final List PROPERTY_DESCRIPTORS = - Stream.concat(getCommonPropertyDescriptors().stream(), Stream.of(SCHEMA_WRITING_STRATEGY, RECORD_READER, RECORD_WRITER)).toList(); - - private static final Set RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE, REL_ORIGINAL); + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + static final PropertyDescriptor SCHEMA_WRITING_STRATEGY = new PropertyDescriptor.Builder() + .name("Schema Writing Strategy") + .description("Specifies how the processor should handle records that result in different schemas after transformation.") + .allowableValues(JoltTransformWritingStrategy.class) + .defaultValue(JoltTransformWritingStrategy.USE_FIRST_SCHEMA) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile records cannot be parsed), it will be routed to this relationship") + .build(); + + static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original FlowFile that was transformed. If the FlowFile fails processing, nothing will be sent to this relationship") + .build(); + + private static final List PROPERTY_DESCRIPTORS = Stream.concat( + getCommonPropertyDescriptors().stream(), + Stream.of( + SCHEMA_WRITING_STRATEGY, + RECORD_READER, + RECORD_WRITER + ) + ).toList(); + + private static final Set RELATIONSHIPS = Set.of( + REL_SUCCESS, + REL_FAILURE, + REL_ORIGINAL + ); @Override public Set getRelationships() { @@ -334,7 +364,6 @@ public void migrateProperties(PropertyConfiguration config) { private List transform(final Record record, final JoltTransform transform) { Map recordMap = (Map) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema())); - // JOLT expects arrays to be of type List where our Record code uses Object[]. // Make another pass of the transformed objects to change Object[] to List. recordMap = (Map) normalizeJoltObjects(recordMap); @@ -349,7 +378,8 @@ private List transform(final Record record, final JoltTransform transfor return recordList; } - // If the top-level object is an array, return a list of the records inside. Otherwise return a singleton list with the single transformed record + // If the top-level object is an array, return a list of the records inside. + // Otherwise return a singleton list with the single transformed record if (normalizedRecordValues instanceof Object[]) { for (Object o : (Object[]) normalizedRecordValues) { if (o != null) { @@ -364,11 +394,13 @@ private List transform(final Record record, final JoltTransform transfor protected static Object transform(JoltTransform joltTransform, Object input) { return joltTransform instanceof ContextualTransform - ? ((ContextualTransform) joltTransform).transform(input, Collections.emptyMap()) : ((Transform) joltTransform).transform(input); + ? + ((ContextualTransform) joltTransform).transform(input, Collections.emptyMap()) : ((Transform) joltTransform).transform(input); } /** - * Recursively replace List objects with Object[]. JOLT expects arrays to be of type List where our Record code uses Object[]. + * Recursively replace List objects with Object[]. + * JOLT expects arrays to be of type List where our Record code uses Object[]. * * @param o The object to normalize with respect to JOLT */ From f94ece1a67852b3993b457d927199bffd7dea991 Mon Sep 17 00:00:00 2001 From: Jordan Sammut Date: Sat, 22 Nov 2025 21:44:29 +0100 Subject: [PATCH 4/8] NIFI-15209 - Added new test for jolt which filters out everything - Cleaned up some irrelevant code - Fix for non-deterministic ordering in test --- .../processors/jolt/JoltTransformRecord.java | 4 ++-- .../jolt/TestBaseJoltTransformRecord.java | 19 +++++++++++++++++++ .../TestJoltTransformRecordPartitioned.java | 19 +++++++++++++++---- .../TestJoltTransformRecord/filterOutAll.json | 8 ++++++++ 4 files changed, 44 insertions(+), 6 deletions(-) create mode 100644 nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/filterOutAll.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java index 1ddc359999c5..cb42a36484d4 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java @@ -286,7 +286,7 @@ private void processUniform(final ProcessContext context, final ProcessSession s while ((currentRecord = reader.nextRecord()) != null) { List transformedRecords = transform(currentRecord, transform); - if (transformedRecords != null && !transformedRecords.isEmpty() && transformedRecords.getFirst() != null) { + if (!transformedRecords.isEmpty() && transformedRecords.getFirst() != null) { firstValidBatch = transformedRecords; firstValidRecord = transformedRecords.getFirst(); break; @@ -321,7 +321,7 @@ private void processUniform(final ProcessContext context, final ProcessSession s // Write the rest while ((currentRecord = reader.nextRecord()) != null) { final List transformedRecords = transform(currentRecord, transform); - if (transformedRecords != null && !transformedRecords.isEmpty()) { + if (!transformedRecords.isEmpty()) { for (Record r : transformedRecords) { if (r != null) writer.write(r); } diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java index 950566dc86a8..bd313a874805 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java @@ -667,6 +667,25 @@ public void testJoltSpecInvalidEL() { runner.assertNotValid(); } + @Test + public void testTransformInputAllFiltered() throws IOException { + generateTestData(3, null); + + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); + runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); + runner.enableControllerService(writer); + + final String flattenSpec = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/filterOutAll.json")); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, flattenSpec); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformStrategy.CHAINR); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 0); + runner.assertTransferCount(JoltTransformRecord.REL_FAILURE, 0); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + } + private static Stream getChainrArguments() { return Stream.of(Arguments.of(Paths.get(CHAINR_SPEC_PATH), "has no single line comments"), diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordPartitioned.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordPartitioned.java index 7589184e0952..ea7ffbbfaa13 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordPartitioned.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordPartitioned.java @@ -61,9 +61,20 @@ public void testTransformInputWithDifferentSchemas() throws InitializationExcept runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 2); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/multipleSchemasOutput1.json")), - new String(runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0).toByteArray())); - assertEquals(Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/multipleSchemasOutput2.json")), - new String(runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(1).toByteArray())); + final String expectedOutput1 = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/multipleSchemasOutput1.json")); + final String expectedOutput2 = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/multipleSchemasOutput2.json")); + + final java.util.List flowFiles = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS); + final String result1 = new String(flowFiles.get(0).toByteArray()); + final String result2 = new String(flowFiles.get(1).toByteArray()); + + // Handles non-deterministic order + if (result1.contains("TRASH")) { + assertEquals(expectedOutput1, result1); + assertEquals(expectedOutput2, result2); + } else { + assertEquals(expectedOutput2, result1); + assertEquals(expectedOutput1, result2); + } } } \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/filterOutAll.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/filterOutAll.json new file mode 100644 index 000000000000..397d8b5193cc --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/filterOutAll.json @@ -0,0 +1,8 @@ +[ + { + "operation": "shift", + "spec": { + "key": "value" + } + } +] \ No newline at end of file From 0e8cceaaf665b4eebbe6de5d58a16803275fa6b8 Mon Sep 17 00:00:00 2001 From: Jordan Sammut Date: Sun, 23 Nov 2025 15:45:50 +0100 Subject: [PATCH 5/8] NIFI-15209 Disabling checks on Windows --- .../nifi/processors/jolt/TestBaseJoltTransformRecord.java | 3 --- .../processors/jolt/TestJoltTransformRecordPartitioned.java | 3 +++ .../nifi/processors/jolt/TestJoltTransformRecordUniform.java | 3 +++ 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java index bd313a874805..a65ab53a6d91 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java @@ -35,8 +35,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledOnOs; -import org.junit.jupiter.api.condition.OS; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -59,7 +57,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -@DisabledOnOs(OS.WINDOWS) //The pretty printed json comparisons don't work on windows public abstract class TestBaseJoltTransformRecord { final static String CHAINR_SPEC_PATH = "src/test/resources/specs/chainrSpec.json"; diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordPartitioned.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordPartitioned.java index ea7ffbbfaa13..5bcfe6f4c96a 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordPartitioned.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordPartitioned.java @@ -23,6 +23,8 @@ import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.schema.inference.SchemaInferenceUtil; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; import java.io.IOException; import java.nio.file.Files; @@ -30,6 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; +@DisabledOnOs(OS.WINDOWS) //The pretty printed json comparisons don't work on windows public class TestJoltTransformRecordPartitioned extends TestBaseJoltTransformRecord { @Override diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordUniform.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordUniform.java index 1789ce752ef1..d951d5f5274b 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordUniform.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordUniform.java @@ -23,12 +23,15 @@ import org.apache.nifi.schema.inference.SchemaInferenceUtil; import org.apache.nifi.util.MockFlowFile; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; import java.nio.file.Files; import java.nio.file.Paths; import static org.junit.jupiter.api.Assertions.assertEquals; +@DisabledOnOs(OS.WINDOWS) //The pretty printed json comparisons don't work on windows public class TestJoltTransformRecordUniform extends TestBaseJoltTransformRecord { @Override From 838c7de4273c8571dcccbd65da9c9286efc173f8 Mon Sep 17 00:00:00 2001 From: Jordan Sammut Date: Sun, 23 Nov 2025 15:53:31 +0100 Subject: [PATCH 6/8] NIFI-15209 Test file rename --- .../nifi/processors/jolt/TestBaseJoltTransformRecord.java | 2 +- .../{filterOutAll.json => filterOutAllSpec.json} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/{filterOutAll.json => filterOutAllSpec.json} (100%) diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java index a65ab53a6d91..b1a775d15cb7 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java @@ -672,7 +672,7 @@ public void testTransformInputAllFiltered() throws IOException { runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); runner.enableControllerService(writer); - final String flattenSpec = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/filterOutAll.json")); + final String flattenSpec = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/filterOutAllSpec.json")); runner.setProperty(JoltTransformRecord.JOLT_SPEC, flattenSpec); runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformStrategy.CHAINR); diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/filterOutAll.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/filterOutAllSpec.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/filterOutAll.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/filterOutAllSpec.json From e67e9fdede92d8b7e38ddd0a77eb3d053b13fdd4 Mon Sep 17 00:00:00 2001 From: Jordan Sammut Date: Mon, 24 Nov 2025 13:03:17 +0100 Subject: [PATCH 7/8] NIFI-15209 Some minor fixes for JoltTransformRecord. Inserted creation of new flowfile after we check that we have a valid record --- .../apache/nifi/processors/jolt/JoltTransformRecord.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java index cb42a36484d4..00a16544d6e5 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java @@ -293,19 +293,16 @@ private void processUniform(final ProcessContext context, final ProcessSession s } } - transformed = session.create(original); final WriteResult writeResult; final Map attributes = new HashMap<>(); if (firstValidRecord == null) { // UPDATED LOGIC: // All records were filtered out (or input was empty). - // The test expects 0 output files. We must remove the FlowFile we created - // and ensure 'transformed' is null so it isn't transferred to SUCCESS later. - session.remove(transformed); - transformed = null; logger.info("{} had no Records to transform", original); } else { + transformed = session.create(original); + // We have at least one valid record, initialize writer with its schema final RecordSchema writeSchema = writerFactory.getSchema(original.getAttributes(), firstValidRecord.getSchema()); From c031c92507a4b6745191594231c89cb02edff835 Mon Sep 17 00:00:00 2001 From: Jordan Sammut Date: Mon, 24 Nov 2025 14:25:55 +0100 Subject: [PATCH 8/8] NIFI-15209 Closing of writer --- .../apache/nifi/processors/jolt/JoltTransformRecord.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java index 00a16544d6e5..59222286caac 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java @@ -47,6 +47,7 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.util.StopWatch; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; @@ -326,6 +327,13 @@ private void processUniform(final ProcessContext context, final ProcessSession s } writeResult = writer.finishRecordSet(); + + try { + writer.close(); + } catch (final IOException ioe) { + getLogger().warn("Failed to close Writer for {}", transformed); + } + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); attributes.putAll(writeResult.getAttributes());