Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import java.util.stream.Stream;

public abstract class AbstractPutElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
static final Relationship REL_ORIGINAL = new Relationship.Builder()
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("All flowfiles that are sent to Elasticsearch without request failures go to this relationship.")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
Expand All @@ -79,6 +80,8 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -420,22 +423,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
try (final InputStream inStream = session.read(input);
final RecordReader reader = readerFactory.createRecordReader(input, inStream, getLogger())) {
final PushBackRecordSet recordSet = new PushBackRecordSet(reader.createRecordSet());
final RecordSchema recordSchema = reader.getSchema();
final List<IndexOperationRequest> operationList = new ArrayList<>();
final List<Record> originals = new ArrayList<>();
final List<Record> processedRecords = new ArrayList<>();
final List<Record> originalRecords = new ArrayList<>();

Record record;
while ((record = recordSet.next()) != null) {
addOperation(operationList, record, indexOperationParameters, indices, types);
originals.add(record);
final Record originalRecord = cloneRecord(record);
final Record processedRecord = cloneRecord(record);
Comment on lines +433 to +434
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to make two clones of the original record?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is. recordSet.next() reuses the same Record instance each time, so we take one deep copy before adding it to processedRecords. It lets us mutate fields (for example through the various fieldValue.updateValue(...) calls) without those changes leaking into later iterations. We take a second deep copy for originalRecords so that, when indexDocuments() builds FlowFiles for failures, it can write out an untouched version of the input rather than the mutated version or re-read from the original FlowFile.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for describing the background, that makes sense.


addOperation(operationList, processedRecord, indexOperationParameters, indices, types);
processedRecords.add(processedRecord);
originalRecords.add(originalRecord);

if (operationList.size() == indexOperationParameters.getBatchSize() || !recordSet.isAnotherRecord()) {
operate(operationList, originals, reader, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords, batches);
operate(operationList, processedRecords, originalRecords, recordSchema, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords);
batches++;
}
}

if (!operationList.isEmpty()) {
operate(operationList, originals, reader, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords, batches);
operate(operationList, processedRecords, originalRecords, recordSchema, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords);
batches++;
}
} catch (final ElasticsearchException ese) {
Expand Down Expand Up @@ -508,20 +517,21 @@ private void addOperation(final List<IndexOperationRequest> operationList, final
operationList.add(new IndexOperationRequest(index, type, id, contentMap, indexOp, script, scriptedUpsert, dynamicTemplates, bulkHeaderFields));
}

private void operate(final List<IndexOperationRequest> operationList, final List<Record> originals, final RecordReader reader,
final ProcessSession session, final FlowFile input, final IndexOperationParameters indexOperationParameters,
final List<FlowFile> resultRecords, final AtomicLong erroredRecords, final AtomicLong successfulRecords, final int batch)
private void operate(final List<IndexOperationRequest> operationList, final List<Record> processedRecords, final List<Record> originalRecords,
final RecordSchema recordSchema, final ProcessSession session, final FlowFile input, final IndexOperationParameters indexOperationParameters,
final List<FlowFile> resultRecords, final AtomicLong erroredRecords, final AtomicLong successfulRecords)
throws IOException, SchemaNotFoundException, MalformedRecordException {

final BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
final ResponseDetails responseDetails = indexDocuments(bundle, session, input, indexOperationParameters, batch);
final BulkOperation bundle = new BulkOperation(operationList, processedRecords, recordSchema);
final ResponseDetails responseDetails = indexDocuments(bundle, originalRecords, session, input, indexOperationParameters);

successfulRecords.getAndAdd(responseDetails.successCount());
erroredRecords.getAndAdd(responseDetails.errorCount());
resultRecords.addAll(responseDetails.outputs().values().stream().map(Output::getFlowFile).toList());

operationList.clear();
originals.clear();
processedRecords.clear();
originalRecords.clear();
}

private void removeResultRecordFlowFiles(final List<FlowFile> results, final ProcessSession session) {
Expand All @@ -532,8 +542,8 @@ private void removeResultRecordFlowFiles(final List<FlowFile> results, final Pro
results.clear();
}

private ResponseDetails indexDocuments(final BulkOperation bundle, final ProcessSession session, final FlowFile input,
final IndexOperationParameters indexOperationParameters, final int batch)
private ResponseDetails indexDocuments(final BulkOperation bundle, final List<Record> originalRecords, final ProcessSession session, final FlowFile input,
final IndexOperationParameters indexOperationParameters)
throws IOException, SchemaNotFoundException, MalformedRecordException {
final IndexOperationResponse response = clientService.get().bulk(bundle.getOperationList(), indexOperationParameters.getElasticsearchRequestOptions());

Expand All @@ -546,16 +556,7 @@ private ResponseDetails indexDocuments(final BulkOperation bundle, final Process
final int numSuccessful = response.getItems() == null ? 0 : response.getItems().size() - numErrors;
final Map<String, Output> outputs = new HashMap<>();

try (final InputStream inStream = session.read(input);
final RecordReader inputReader = readerFactory.createRecordReader(input, inStream, getLogger())) {

// if there are errors present, skip through the input FlowFile to the current batch of records
if (numErrors > 0) {
for (int r = 0; r < batch * indexOperationParameters.getBatchSize(); r++) {
inputReader.nextRecord();
}
}

try {
for (int o = 0; o < bundle.getOriginalRecords().size(); o++) {
final String type;
final Relationship relationship;
Expand All @@ -574,18 +575,14 @@ private ResponseDetails indexDocuments(final BulkOperation bundle, final Process
} else {
type = OUTPUT_TYPE_ERROR;
}
outputRecord = inputReader.nextRecord();
outputRecord = originalRecords.get(o);
recordSchema = outputRecord.getSchema();
} else {
relationship = REL_SUCCESSFUL;
error = null;
type = OUTPUT_TYPE_SUCCESS;
outputRecord = bundle.getOriginalRecords().get(o);
recordSchema = bundle.getSchema();
// skip the associated Input Record for this successful Record
if (numErrors > 0) {
inputReader.nextRecord();
}
}
final Output output = getOutputByType(outputs, type, session, relationship, input, recordSchema);
output.write(outputRecord, error);
Expand All @@ -594,7 +591,7 @@ private ResponseDetails indexDocuments(final BulkOperation bundle, final Process
for (final Output output : outputs.values()) {
output.transfer(session);
}
} catch (final IOException | SchemaNotFoundException | MalformedRecordException ex) {
} catch (final IOException | SchemaNotFoundException ex) {
getLogger().error("Unable to write error/successful records", ex);
outputs.values().forEach(o -> {
try {
Expand Down Expand Up @@ -773,6 +770,47 @@ private Object coerceStringToLong(final String fieldName, final String stringVal
: stringValue;
}

private Record cloneRecord(final Record record) {
final Map<String, Object> valuesCopy = cloneValues(record.toMap());
return new MapRecord(record.getSchema(), valuesCopy, record.isTypeChecked(), record.isDropUnknownFields());
}

private Map<String, Object> cloneValues(final Map<String, Object> values) {
final Map<String, Object> cloned = new LinkedHashMap<>(values.size());
for (final Map.Entry<String, Object> entry : values.entrySet()) {
cloned.put(entry.getKey(), cloneValue(entry.getValue()));
}
return cloned;
}

private Object cloneValue(final Object value) {
Comment on lines +773 to +786
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating all these clone methods, it would be cleaner to use something like Apache Commons SerializationUtils#clone. The only thing you would have to make sure though is all of the data to be cloned implements the Serializable interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our case the records are MapRecord instances, and MapRecord (and other Record implementations) do not implement Serializable. Besides, Java serialization would add noticeable overhead on a hot path that can process large batches, whereas the current approach only walk the map/collection structures we actually store.

Copy link
Contributor

@dan-s1 dan-s1 Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying that. Could this cloning be reused? In other words would it make sense to have a clone/deepClone method in the Record interface and implemented in the implementations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it could, but I'm not sure this is really a pattern we would want to encourage. The scenario here is quite specific. Others may have a different opinion though.

if (value instanceof Record recordValue) {
return cloneRecord(recordValue);
}
if (value instanceof Map<?, ?> mapValue) {
final Map<Object, Object> clonedMap = new LinkedHashMap<>(mapValue.size());
for (final Map.Entry<?, ?> entry : mapValue.entrySet()) {
clonedMap.put(entry.getKey(), cloneValue(entry.getValue()));
}
return clonedMap;
}
if (value instanceof Collection<?> collectionValue) {
final List<Object> clonedList = new ArrayList<>(collectionValue.size());
for (final Object element : collectionValue) {
clonedList.add(cloneValue(element));
}
return clonedList;
}
if (value instanceof Object[] arrayValue) {
final Object[] clonedArray = new Object[arrayValue.length];
for (int i = 0; i < arrayValue.length; i++) {
clonedArray[i] = cloneValue(arrayValue[i]);
}
return clonedArray;
}
return value;
}

private class Output {
private FlowFile flowFile;
private final RecordSetWriter writer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -141,6 +144,30 @@ void testErrorRecordsOutputMultipleBatches() {
assertTrue(error2Content.contains("\"foo\":456"), error2Content);
}

@Test
void testMultipleBatchesWithoutRecursiveReads() {
final String json = generateRecordsJson(2000);

runner.setProperty(ElasticsearchRestProcessor.INDEX, "test-multiple-batches-no-recursive");
runner.setProperty(PutElasticsearchRecord.BATCH_SIZE, "100");
runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id");
runner.setAllowRecursiveReads(false);

runner.enqueue(json);
runner.run();

runner.assertTransferCount(ElasticsearchRestProcessor.REL_FAILURE, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);

final List<MockFlowFile> successful = runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL);
assertEquals(20, successful.size());
final int totalRecordCount = successful.stream()
.mapToInt(flowFile -> Integer.parseInt(flowFile.getAttribute("record.count")))
.sum();
assertEquals(2000, totalRecordCount);
}

@Test
void testUpdateError() {
final String json = """
Expand Down Expand Up @@ -200,4 +227,17 @@ void testScriptedUpsertError() {
assertTrue(errorContent.contains("\"id\":\"123\""), errorContent);
assertTrue(errorContent.contains("\"script\":{"), errorContent);
}

private String generateRecordsJson(final int recordCount) {
final StringBuilder builder = new StringBuilder(recordCount * 32);
builder.append('[');
for (int i = 0; i < recordCount; i++) {
builder.append("{\"id\":\"").append(i).append("\",\"value\":").append(i).append('}');
if (i < recordCount - 1) {
builder.append(',');
}
}
builder.append(']');
return builder.toString();
}
}
2 changes: 1 addition & 1 deletion nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ language governing permissions and limitations under the License. -->
<profile>
<id>elasticsearch8</id>
<properties>
<elasticsearch_docker_image>8.18.2</elasticsearch_docker_image>
<elasticsearch_docker_image>8.19.4</elasticsearch_docker_image>
</properties>
</profile>
<profile>
Expand Down