diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java index de0b7dde17f9..003101ed478b 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java @@ -106,8 +106,6 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { * @param nestedFieldName the name of the field to start the processing from * @param captureFieldPredicate predicate that takes a JSON fieldName and fieldValue to capture top-level non-processed fields which can * be accessed by calling {@link #getCapturedFields()} - * @param allowComments whether to allow comments within the JSON stream - * @param streamReadConstraints configuration for the JsonFactory stream reader {@link StreamReadConstraints} * @param tokenParserFactory factory to provide an instance of com.fasterxml.jackson.core.JsonParser * @throws IOException in case of JSON stream processing failure * @throws MalformedRecordException in case of malformed JSON input @@ -120,8 +118,6 @@ protected AbstractJsonRowRecordReader(final InputStream in, final StartingFieldStrategy strategy, final String nestedFieldName, final BiPredicate captureFieldPredicate, - final boolean allowComments, - final StreamReadConstraints streamReadConstraints, final TokenParserFactory tokenParserFactory) throws IOException, MalformedRecordException { @@ -137,8 +133,7 @@ protected AbstractJsonRowRecordReader(final InputStream in, capturedFields = new LinkedHashMap<>(); try { - final StreamReadConstraints configuredStreamReadConstraints = streamReadConstraints == null ? DEFAULT_STREAM_READ_CONSTRAINTS : streamReadConstraints; - jsonParser = tokenParserFactory.getJsonParser(in, configuredStreamReadConstraints, allowComments); + jsonParser = tokenParserFactory.getJsonParser(in); jsonParser.enable(Feature.USE_FAST_DOUBLE_PARSER); jsonParser.enable(Feature.USE_FAST_BIG_NUMBER_PARSER); } catch (final JsonParseException e) { diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonParserFactory.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonParserFactory.java index d2e7484b3dee..827e7751b605 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonParserFactory.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonParserFactory.java @@ -26,17 +26,37 @@ import java.util.Objects; public class JsonParserFactory implements TokenParserFactory { - @Override - public JsonParser getJsonParser(final InputStream in, final StreamReadConstraints streamReadConstraints, final boolean allowComments) throws IOException { - Objects.requireNonNull(in, "Input Stream required"); + private static final ObjectMapper defaultObjectMapper = new ObjectMapper(); + + private final JsonFactory jsonFactory; + + /** + * JSON Parser Factory constructor using default ObjectMapper and associated configuration options + */ + public JsonParserFactory() { + jsonFactory = defaultObjectMapper.getFactory(); + } + + /** + * JSON Parser Factory constructor with configurable constraints + * + * @param streamReadConstraints Stream Read Constraints + * @param allowComments Allow Comments during parsing + */ + public JsonParserFactory(final StreamReadConstraints streamReadConstraints, final boolean allowComments) { Objects.requireNonNull(streamReadConstraints, "Stream Read Constraints required"); final ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints); if (allowComments) { objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS); } - final JsonFactory jsonFactory = objectMapper.getFactory(); + jsonFactory = objectMapper.getFactory(); + jsonFactory.setStreamReadConstraints(streamReadConstraints); + } + + @Override + public JsonParser getJsonParser(final InputStream in) throws IOException { + Objects.requireNonNull(in, "Input Stream required"); return jsonFactory.createParser(in); } } diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java index db6e606e0e12..a8731854edef 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java @@ -68,7 +68,7 @@ public JsonPathRowRecordReader(final LinkedHashMap jsonPaths, final boolean allowComments, final StreamReadConstraints streamReadConstraints) throws MalformedRecordException, IOException { - super(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, allowComments, streamReadConstraints, new JsonParserFactory()); + super(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, new JsonParserFactory(streamReadConstraints, allowComments)); this.schema = schema; this.jsonPaths = jsonPaths; diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java index 904083d82085..40ae91a72d6f 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java @@ -35,24 +35,25 @@ public class JsonRecordSource implements RecordSource { private static final boolean ALLOW_COMMENTS_ENABLED = true; + private static final TokenParserFactory defaultTokenParserFactory = new JsonParserFactory(DEFAULT_STREAM_READ_CONSTRAINTS, ALLOW_COMMENTS_ENABLED); + private final JsonParser jsonParser; private final StartingFieldStrategy strategy; public JsonRecordSource(final InputStream in) throws IOException { - this(in, null, null, DEFAULT_STREAM_READ_CONSTRAINTS); + this(in, null, null, defaultTokenParserFactory); } public JsonRecordSource(final InputStream in, StreamReadConstraints streamReadConstraints) throws IOException { this(in, null, null, streamReadConstraints); } - public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, StreamReadConstraints streamReadConstraints) throws IOException { - this(in, strategy, startingFieldName, new JsonParserFactory(), streamReadConstraints); + public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, final StreamReadConstraints streamReadConstraints) throws IOException { + this(in, strategy, startingFieldName, new JsonParserFactory(streamReadConstraints, ALLOW_COMMENTS_ENABLED)); } - public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, TokenParserFactory tokenParserFactory, - StreamReadConstraints streamReadConstraints) throws IOException { - jsonParser = tokenParserFactory.getJsonParser(in, streamReadConstraints, ALLOW_COMMENTS_ENABLED); + public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, final TokenParserFactory tokenParserFactory) throws IOException { + jsonParser = tokenParserFactory.getJsonParser(in); this.strategy = strategy; if (strategy == StartingFieldStrategy.NESTED_FIELD) { diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java index 43cc121f1af9..aade3d104074 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java @@ -17,7 +17,6 @@ package org.apache.nifi.json; -import com.fasterxml.jackson.core.StreamReadConstraints; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -53,15 +52,21 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { private final RecordSchema schema; - public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, - final String dateFormat, final String timeFormat, final String timestampFormat, - final StartingFieldStrategy startingFieldStrategy, final String startingFieldName, - final SchemaApplicationStrategy schemaApplicationStrategy, final BiPredicate captureFieldPredicate, - final boolean allowComments, final StreamReadConstraints streamReadConstraints, final TokenParserFactory tokenParserFactory) - throws IOException, MalformedRecordException { - - super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, captureFieldPredicate, - allowComments, streamReadConstraints, tokenParserFactory); + public JsonTreeRowRecordReader( + final InputStream in, + final ComponentLog logger, + final RecordSchema schema, + final String dateFormat, + final String timeFormat, + final String timestampFormat, + final StartingFieldStrategy startingFieldStrategy, + final String startingFieldName, + final SchemaApplicationStrategy schemaApplicationStrategy, + final BiPredicate captureFieldPredicate, + final TokenParserFactory tokenParserFactory + ) throws IOException, MalformedRecordException { + + super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, captureFieldPredicate, tokenParserFactory); if (startingFieldStrategy == StartingFieldStrategy.NESTED_FIELD && schemaApplicationStrategy == SchemaApplicationStrategy.WHOLE_JSON) { this.schema = getSelectedSchema(schema, startingFieldName); diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/TokenParserFactory.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/TokenParserFactory.java index 1c3a812fe2a4..dfeb4ab5ebc2 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/TokenParserFactory.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/TokenParserFactory.java @@ -17,20 +17,17 @@ package org.apache.nifi.json; import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.StreamReadConstraints; import java.io.IOException; import java.io.InputStream; public interface TokenParserFactory { /** - * Get JSON Parser implementation for provided Input Stream with configured settings + * Get JSON Parser implementation for provided Input Stream with preconfigured settings * * @param in Input Stream to be parsed - * @param streamReadConstraints Stream Read Constraints applied - * @param allowComments Whether to allow comments when parsing * @return JSON Parser * @throws IOException Thrown on failures to read the Input Stream */ - JsonParser getJsonParser(InputStream in, StreamReadConstraints streamReadConstraints, boolean allowComments) throws IOException; + JsonParser getJsonParser(InputStream in) throws IOException; } diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlParserFactory.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlParserFactory.java index 9fd65a4638cc..a79c7d417b59 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlParserFactory.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlParserFactory.java @@ -25,26 +25,48 @@ import java.io.IOException; import java.io.InputStream; +import java.util.Objects; public class YamlParserFactory implements TokenParserFactory { + private static final YAMLMapper yamlMapper = new YAMLMapper(); + + private final YAMLFactory yamlFactory; + + /** + * YAML Parser Factory constructor with default configuration for YAML Mapper + */ + public YamlParserFactory() { + yamlFactory = YAMLFactory.builder().build(); + yamlFactory.setCodec(yamlMapper); + } + + /** + * YAML Parser Factory constructor with configurable parsing constraints + * + * @param streamReadConstraints Stream Read Constraints required + * @param allowComments Allow Comments during parsing + */ + public YamlParserFactory(final StreamReadConstraints streamReadConstraints, final boolean allowComments) { + Objects.requireNonNull(streamReadConstraints, "Stream Read Constraints required"); + + final LoaderOptions loaderOptions = new LoaderOptions(); + loaderOptions.setCodePointLimit(streamReadConstraints.getMaxStringLength()); + loaderOptions.setProcessComments(allowComments); + + yamlFactory = YAMLFactory.builder().loaderOptions(loaderOptions).build(); + yamlFactory.setCodec(yamlMapper); + } + /** * Get Parser implementation for YAML * * @param in Input Stream to be parsed - * @param streamReadConstraints Stream Read Constraints are not supported in YAML - * @param allowComments Whether to allow comments when parsing does not apply to YAML * @return YAML Parser * @throws IOException Thrown on parser creation failures */ @Override - public JsonParser getJsonParser(final InputStream in, final StreamReadConstraints streamReadConstraints, final boolean allowComments) throws IOException { - final LoaderOptions loaderOptions = new LoaderOptions(); - loaderOptions.setCodePointLimit(streamReadConstraints.getMaxStringLength()); - final YAMLFactory yamlFactory = YAMLFactory.builder() - .loaderOptions(loaderOptions) - .build(); - - return yamlFactory.setCodec(new YAMLMapper()).createParser(in); + public JsonParser getJsonParser(final InputStream in) throws IOException { + return yamlFactory.createParser(in); } } diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlRecordSource.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlRecordSource.java deleted file mode 100644 index e9a07c0895e4..000000000000 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlRecordSource.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.yaml; - -import com.fasterxml.jackson.core.StreamReadConstraints; -import org.apache.nifi.json.JsonRecordSource; -import org.apache.nifi.json.StartingFieldStrategy; - -import java.io.IOException; -import java.io.InputStream; - -public class YamlRecordSource extends JsonRecordSource { - public YamlRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, StreamReadConstraints streamReadConstraints) throws IOException { - super(in, strategy, startingFieldName, new YamlParserFactory(), streamReadConstraints); - } -} diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlTreeRowRecordReader.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlTreeRowRecordReader.java index 25136d85aa3c..93048df79dec 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlTreeRowRecordReader.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlTreeRowRecordReader.java @@ -30,6 +30,8 @@ public class YamlTreeRowRecordReader extends JsonTreeRowRecordReader { + private static final YamlParserFactory yamlParserFactory = new YamlParserFactory(); + public YamlTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException { this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null); @@ -42,6 +44,6 @@ public YamlTreeRowRecordReader(final InputStream in, final ComponentLog logger, throws IOException, MalformedRecordException { super(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy, - captureFieldPredicate, true, null, new YamlParserFactory()); + captureFieldPredicate, yamlParserFactory); } } diff --git a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java index e6f981165164..8dd10705b786 100644 --- a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java +++ b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java @@ -76,6 +76,7 @@ public class RecordTransformProxy extends PythonProcessorProxy .required(true) .build(); + private static final JsonParserFactory jsonParserFactory = new JsonParserFactory(); public RecordTransformProxy(final String processorType, final Supplier bridgeFactory, final boolean initialize) { super(processorType, bridgeFactory, initialize); @@ -303,7 +304,7 @@ private Record createRecordFromJson(final RecordTransformResult transformResult) try (final InputStream in = new ByteArrayInputStream(jsonBytes)) { final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, getLogger(), schema, null, null, null, null, - null, null, null, false, null, new JsonParserFactory()); + null, null, null, jsonParserFactory); final Record record = reader.nextRecord(false, false); return record; } diff --git a/nifi-extension-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java b/nifi-extension-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java index 80f7437da400..b8fd2899699b 100644 --- a/nifi-extension-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java +++ b/nifi-extension-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java @@ -274,6 +274,7 @@ public class QuerySalesforceObject extends AbstractProcessor { private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory(); private static final String TOTAL_RECORD_COUNT_ATTRIBUTE = "total.record.count"; private static final int MAX_RECORD_COUNT = 2000; + private static final JsonParserFactory jsonParserFactory = new JsonParserFactory(); private volatile SalesforceToRecordSchemaConverter salesForceToRecordSchemaConverter; private volatile SalesforceRestClient salesforceRestService; @@ -508,9 +509,7 @@ private JsonTreeRowRecordReader createJsonReader(InputStream querySObjectResultI STARTING_FIELD_NAME, SchemaApplicationStrategy.SELECTED_PART, CAPTURE_PREDICATE, - false, - null, - new JsonParserFactory() + jsonParserFactory ); } diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java index 4c10b7d52906..db0fff865d64 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java @@ -453,6 +453,8 @@ public void testExtractMode() throws InitializationException, IOException { private class JsonRecordReader extends AbstractControllerService implements RecordReaderFactory { + private static final JsonParserFactory jsonParserFactory = new JsonParserFactory(); + RecordSchema schema; public JsonRecordReader(RecordSchema schema) { @@ -460,14 +462,14 @@ public JsonRecordReader(RecordSchema schema) { } @Override - public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { - return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null, false, null, new JsonParserFactory()); + public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException { + return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null, jsonParserFactory); } @Override public RecordReader createRecordReader(Map variables, InputStream in, long inputLength, ComponentLog logger) - throws MalformedRecordException, IOException, SchemaNotFoundException { - return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null, false, null, new JsonParserFactory()); + throws MalformedRecordException, IOException { + return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null, jsonParserFactory); } } diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java index 8980ddd82318..bdc649122cb9 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java @@ -74,8 +74,7 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade protected volatile String startingFieldName; protected volatile StartingFieldStrategy startingFieldStrategy; protected volatile SchemaApplicationStrategy schemaApplicationStrategy; - protected volatile StreamReadConstraints streamReadConstraints; - private volatile boolean allowComments; + protected volatile TokenParserFactory tokenParserFactory; public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new PropertyDescriptor.Builder() .name("starting-field-strategy") @@ -135,8 +134,11 @@ public void storePropertyValues(final ConfigurationContext context) { this.startingFieldStrategy = StartingFieldStrategy.valueOf(context.getProperty(STARTING_FIELD_STRATEGY).getValue()); this.startingFieldName = context.getProperty(STARTING_FIELD_NAME).getValue(); this.schemaApplicationStrategy = SchemaApplicationStrategy.valueOf(context.getProperty(SCHEMA_APPLICATION_STRATEGY).getValue()); - this.streamReadConstraints = buildStreamReadConstraints(context); - this.allowComments = isAllowCommentsEnabled(context); + this.tokenParserFactory = createTokenParserFactory(context); + } + + protected TokenParserFactory createTokenParserFactory(final ConfigurationContext context) { + return new JsonParserFactory(buildStreamReadConstraints(context), isAllowCommentsEnabled(context)); } /** @@ -179,7 +181,7 @@ protected SchemaAccessStrategy getSchemaAccessStrategy(final String schemaAccess } protected RecordSourceFactory createJsonRecordSourceFactory() { - return (variables, in) -> new JsonRecordSource(in, startingFieldStrategy, startingFieldName, streamReadConstraints); + return (variables, in) -> new JsonRecordSource(in, startingFieldStrategy, startingFieldName, tokenParserFactory); } @Override @@ -195,6 +197,6 @@ public RecordReader createRecordReader(final Map variables, fina protected JsonTreeRowRecordReader createJsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema) throws IOException, MalformedRecordException { return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, - schemaApplicationStrategy, null, allowComments, streamReadConstraints, new JsonParserFactory()); + schemaApplicationStrategy, null, tokenParserFactory); } } diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/yaml/YamlTreeReader.java b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/yaml/YamlTreeReader.java index abc0820e5056..8569103d3744 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/yaml/YamlTreeReader.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/yaml/YamlTreeReader.java @@ -17,15 +17,14 @@ package org.apache.nifi.yaml; -import com.fasterxml.jackson.databind.JsonNode; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.json.JsonTreeRowRecordReader; +import org.apache.nifi.json.TokenParserFactory; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.schema.inference.RecordSourceFactory; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.record.RecordSchema; @@ -53,8 +52,8 @@ protected List getSupportedPropertyDescriptors() { } @Override - protected RecordSourceFactory createJsonRecordSourceFactory() { - return (var, in) -> new YamlRecordSource(in, startingFieldStrategy, startingFieldName, streamReadConstraints); + protected TokenParserFactory createTokenParserFactory(final ConfigurationContext context) { + return new YamlParserFactory(buildStreamReadConstraints(context), isAllowCommentsEnabled(context)); } @Override diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java index f08e8f129c74..abc6861b2eb7 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java @@ -1329,7 +1329,15 @@ private JsonTreeRowRecordReader createJsonTreeRowRecordReader(InputStream inputS StartingFieldStrategy startingFieldStrategy, String startingFieldName, SchemaApplicationStrategy schemaApplicationStrategy, BiPredicate captureFieldPredicate, boolean allowComments, StreamReadConstraints streamReadConstraints) throws Exception { + + final TokenParserFactory tokenParserFactory; + if (streamReadConstraints == null) { + tokenParserFactory = new JsonParserFactory(); + } else { + tokenParserFactory = new JsonParserFactory(streamReadConstraints, allowComments); + } + return new JsonTreeRowRecordReader(inputStream, log, recordSchema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy, - captureFieldPredicate, allowComments, streamReadConstraints, new JsonParserFactory()); + captureFieldPredicate, tokenParserFactory); } } diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java index 4b42ff75e6b0..867af70089f2 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java @@ -21,6 +21,7 @@ import org.apache.avro.Schema; import org.apache.commons.io.FileUtils; import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.json.JsonRecordSource; import org.apache.nifi.json.JsonSchemaInference; import org.apache.nifi.json.JsonTreeRowRecordReader; import org.apache.nifi.json.SchemaApplicationStrategy; @@ -1091,7 +1092,7 @@ private void testNestedReadRecords(InputStream yamlStream, private RecordSchema inferSchema(InputStream jsonStream, StartingFieldStrategy strategy, String startingFieldName) throws IOException { RecordSchema schema = new InferSchemaAccessStrategy<>( - (__, inputStream) -> new YamlRecordSource(inputStream, strategy, startingFieldName, StreamReadConstraints.defaults()), + (__, inputStream) -> new JsonRecordSource(inputStream, strategy, startingFieldName, new YamlParserFactory()), new JsonSchemaInference(new TimeValueInference(null, null, null)), mock(ComponentLog.class) ).getSchema(Collections.emptyMap(), jsonStream, null);