Skip to content

Commit

Permalink
NIFI-14155 Refactored TokenParserFactory to avoid unnecessary Mapper …
Browse files Browse the repository at this point in the history
…creation
  • Loading branch information
exceptionfactory committed Jan 17, 2025
1 parent 4ae0a47 commit 9dd5407
Show file tree
Hide file tree
Showing 16 changed files with 118 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
* @param nestedFieldName the name of the field to start the processing from
* @param captureFieldPredicate predicate that takes a JSON fieldName and fieldValue to capture top-level non-processed fields which can
* be accessed by calling {@link #getCapturedFields()}
* @param allowComments whether to allow comments within the JSON stream
* @param streamReadConstraints configuration for the JsonFactory stream reader {@link StreamReadConstraints}
* @param tokenParserFactory factory to provide an instance of com.fasterxml.jackson.core.JsonParser
* @throws IOException in case of JSON stream processing failure
* @throws MalformedRecordException in case of malformed JSON input
Expand All @@ -120,8 +118,6 @@ protected AbstractJsonRowRecordReader(final InputStream in,
final StartingFieldStrategy strategy,
final String nestedFieldName,
final BiPredicate<String, String> captureFieldPredicate,
final boolean allowComments,
final StreamReadConstraints streamReadConstraints,
final TokenParserFactory tokenParserFactory)
throws IOException, MalformedRecordException {

Expand All @@ -137,8 +133,7 @@ protected AbstractJsonRowRecordReader(final InputStream in,
capturedFields = new LinkedHashMap<>();

try {
final StreamReadConstraints configuredStreamReadConstraints = streamReadConstraints == null ? DEFAULT_STREAM_READ_CONSTRAINTS : streamReadConstraints;
jsonParser = tokenParserFactory.getJsonParser(in, configuredStreamReadConstraints, allowComments);
jsonParser = tokenParserFactory.getJsonParser(in);
jsonParser.enable(Feature.USE_FAST_DOUBLE_PARSER);
jsonParser.enable(Feature.USE_FAST_BIG_NUMBER_PARSER);
} catch (final JsonParseException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,37 @@
import java.util.Objects;

public class JsonParserFactory implements TokenParserFactory {
@Override
public JsonParser getJsonParser(final InputStream in, final StreamReadConstraints streamReadConstraints, final boolean allowComments) throws IOException {
Objects.requireNonNull(in, "Input Stream required");
private static final ObjectMapper defaultObjectMapper = new ObjectMapper();

private final JsonFactory jsonFactory;

/**
* JSON Parser Factory constructor using default ObjectMapper and associated configuration options
*/
public JsonParserFactory() {
jsonFactory = defaultObjectMapper.getFactory();
}

/**
* JSON Parser Factory constructor with configurable constraints
*
* @param streamReadConstraints Stream Read Constraints
* @param allowComments Allow Comments during parsing
*/
public JsonParserFactory(final StreamReadConstraints streamReadConstraints, final boolean allowComments) {
Objects.requireNonNull(streamReadConstraints, "Stream Read Constraints required");

final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
if (allowComments) {
objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);
}
final JsonFactory jsonFactory = objectMapper.getFactory();
jsonFactory = objectMapper.getFactory();
jsonFactory.setStreamReadConstraints(streamReadConstraints);
}

@Override
public JsonParser getJsonParser(final InputStream in) throws IOException {
Objects.requireNonNull(in, "Input Stream required");
return jsonFactory.createParser(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths,
final boolean allowComments, final StreamReadConstraints streamReadConstraints)
throws MalformedRecordException, IOException {

super(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, allowComments, streamReadConstraints, new JsonParserFactory());
super(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, new JsonParserFactory(streamReadConstraints, allowComments));

this.schema = schema;
this.jsonPaths = jsonPaths;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,25 @@ public class JsonRecordSource implements RecordSource<JsonNode> {

private static final boolean ALLOW_COMMENTS_ENABLED = true;

private static final TokenParserFactory defaultTokenParserFactory = new JsonParserFactory(DEFAULT_STREAM_READ_CONSTRAINTS, ALLOW_COMMENTS_ENABLED);

private final JsonParser jsonParser;
private final StartingFieldStrategy strategy;

public JsonRecordSource(final InputStream in) throws IOException {
this(in, null, null, DEFAULT_STREAM_READ_CONSTRAINTS);
this(in, null, null, defaultTokenParserFactory);
}

public JsonRecordSource(final InputStream in, StreamReadConstraints streamReadConstraints) throws IOException {
this(in, null, null, streamReadConstraints);
}

public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, StreamReadConstraints streamReadConstraints) throws IOException {
this(in, strategy, startingFieldName, new JsonParserFactory(), streamReadConstraints);
public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, final StreamReadConstraints streamReadConstraints) throws IOException {
this(in, strategy, startingFieldName, new JsonParserFactory(streamReadConstraints, ALLOW_COMMENTS_ENABLED));
}

public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, TokenParserFactory tokenParserFactory,
StreamReadConstraints streamReadConstraints) throws IOException {
jsonParser = tokenParserFactory.getJsonParser(in, streamReadConstraints, ALLOW_COMMENTS_ENABLED);
public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, final TokenParserFactory tokenParserFactory) throws IOException {
jsonParser = tokenParserFactory.getJsonParser(in);
this.strategy = strategy;

if (strategy == StartingFieldStrategy.NESTED_FIELD) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.nifi.json;

import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;

Expand Down Expand Up @@ -53,15 +52,21 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {

private final RecordSchema schema;

public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
final String dateFormat, final String timeFormat, final String timestampFormat,
final StartingFieldStrategy startingFieldStrategy, final String startingFieldName,
final SchemaApplicationStrategy schemaApplicationStrategy, final BiPredicate<String, String> captureFieldPredicate,
final boolean allowComments, final StreamReadConstraints streamReadConstraints, final TokenParserFactory tokenParserFactory)
throws IOException, MalformedRecordException {

super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, captureFieldPredicate,
allowComments, streamReadConstraints, tokenParserFactory);
public JsonTreeRowRecordReader(
final InputStream in,
final ComponentLog logger,
final RecordSchema schema,
final String dateFormat,
final String timeFormat,
final String timestampFormat,
final StartingFieldStrategy startingFieldStrategy,
final String startingFieldName,
final SchemaApplicationStrategy schemaApplicationStrategy,
final BiPredicate<String, String> captureFieldPredicate,
final TokenParserFactory tokenParserFactory
) throws IOException, MalformedRecordException {

super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, captureFieldPredicate, tokenParserFactory);

if (startingFieldStrategy == StartingFieldStrategy.NESTED_FIELD && schemaApplicationStrategy == SchemaApplicationStrategy.WHOLE_JSON) {
this.schema = getSelectedSchema(schema, startingFieldName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@
package org.apache.nifi.json;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.StreamReadConstraints;

import java.io.IOException;
import java.io.InputStream;

public interface TokenParserFactory {
/**
* Get JSON Parser implementation for provided Input Stream with configured settings
* Get JSON Parser implementation for provided Input Stream with preconfigured settings
*
* @param in Input Stream to be parsed
* @param streamReadConstraints Stream Read Constraints applied
* @param allowComments Whether to allow comments when parsing
* @return JSON Parser
* @throws IOException Thrown on failures to read the Input Stream
*/
JsonParser getJsonParser(InputStream in, StreamReadConstraints streamReadConstraints, boolean allowComments) throws IOException;
JsonParser getJsonParser(InputStream in) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,48 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;

public class YamlParserFactory implements TokenParserFactory {

private static final YAMLMapper yamlMapper = new YAMLMapper();

private final YAMLFactory yamlFactory;

/**
* YAML Parser Factory constructor with default configuration for YAML Mapper
*/
public YamlParserFactory() {
yamlFactory = YAMLFactory.builder().build();
yamlFactory.setCodec(yamlMapper);
}

/**
* YAML Parser Factory constructor with configurable parsing constraints
*
* @param streamReadConstraints Stream Read Constraints required
* @param allowComments Allow Comments during parsing
*/
public YamlParserFactory(final StreamReadConstraints streamReadConstraints, final boolean allowComments) {
Objects.requireNonNull(streamReadConstraints, "Stream Read Constraints required");

final LoaderOptions loaderOptions = new LoaderOptions();
loaderOptions.setCodePointLimit(streamReadConstraints.getMaxStringLength());
loaderOptions.setProcessComments(allowComments);

yamlFactory = YAMLFactory.builder().loaderOptions(loaderOptions).build();
yamlFactory.setCodec(yamlMapper);
}

/**
* Get Parser implementation for YAML
*
* @param in Input Stream to be parsed
* @param streamReadConstraints Stream Read Constraints are not supported in YAML
* @param allowComments Whether to allow comments when parsing does not apply to YAML
* @return YAML Parser
* @throws IOException Thrown on parser creation failures
*/
@Override
public JsonParser getJsonParser(final InputStream in, final StreamReadConstraints streamReadConstraints, final boolean allowComments) throws IOException {
final LoaderOptions loaderOptions = new LoaderOptions();
loaderOptions.setCodePointLimit(streamReadConstraints.getMaxStringLength());
final YAMLFactory yamlFactory = YAMLFactory.builder()
.loaderOptions(loaderOptions)
.build();

return yamlFactory.setCodec(new YAMLMapper()).createParser(in);
public JsonParser getJsonParser(final InputStream in) throws IOException {
return yamlFactory.createParser(in);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

public class YamlTreeRowRecordReader extends JsonTreeRowRecordReader {

private static final YamlParserFactory yamlParserFactory = new YamlParserFactory();

public YamlTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException {
this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null);
Expand All @@ -42,6 +44,6 @@ public YamlTreeRowRecordReader(final InputStream in, final ComponentLog logger,
throws IOException, MalformedRecordException {

super(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy,
captureFieldPredicate, true, null, new YamlParserFactory());
captureFieldPredicate, yamlParserFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class RecordTransformProxy extends PythonProcessorProxy<RecordTransform>
.required(true)
.build();

private static final JsonParserFactory jsonParserFactory = new JsonParserFactory();

public RecordTransformProxy(final String processorType, final Supplier<PythonProcessorBridge> bridgeFactory, final boolean initialize) {
super(processorType, bridgeFactory, initialize);
Expand Down Expand Up @@ -303,7 +304,7 @@ private Record createRecordFromJson(final RecordTransformResult transformResult)

try (final InputStream in = new ByteArrayInputStream(jsonBytes)) {
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, getLogger(), schema, null, null, null, null,
null, null, null, false, null, new JsonParserFactory());
null, null, null, jsonParserFactory);
final Record record = reader.nextRecord(false, false);
return record;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
private static final String TOTAL_RECORD_COUNT_ATTRIBUTE = "total.record.count";
private static final int MAX_RECORD_COUNT = 2000;
private static final JsonParserFactory jsonParserFactory = new JsonParserFactory();

private volatile SalesforceToRecordSchemaConverter salesForceToRecordSchemaConverter;
private volatile SalesforceRestClient salesforceRestService;
Expand Down Expand Up @@ -508,9 +509,7 @@ private JsonTreeRowRecordReader createJsonReader(InputStream querySObjectResultI
STARTING_FIELD_NAME,
SchemaApplicationStrategy.SELECTED_PART,
CAPTURE_PREDICATE,
false,
null,
new JsonParserFactory()
jsonParserFactory
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,21 +453,23 @@ public void testExtractMode() throws InitializationException, IOException {

private class JsonRecordReader extends AbstractControllerService implements RecordReaderFactory {

private static final JsonParserFactory jsonParserFactory = new JsonParserFactory();

RecordSchema schema;

public JsonRecordReader(RecordSchema schema) {
this.schema = schema;
}

@Override
public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null, false, null, new JsonParserFactory());
public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException {
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null, jsonParserFactory);
}

@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger)
throws MalformedRecordException, IOException, SchemaNotFoundException {
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null, false, null, new JsonParserFactory());
throws MalformedRecordException, IOException {
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null, jsonParserFactory);
}
}

Expand Down
Loading

0 comments on commit 9dd5407

Please sign in to comment.