diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java index 2956173baf54..2a73f16c3c2a 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java @@ -55,7 +55,7 @@ import java.util.stream.Stream; public abstract class AbstractPutElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor { - static final Relationship REL_ORIGINAL = new Relationship.Builder() + public static final Relationship REL_ORIGINAL = new Relationship.Builder() .name("original") .description("All flowfiles that are sent to Elasticsearch without request failures go to this relationship.") .build(); diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java index 8602a76883b5..2bbb37010cd5 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java @@ -61,6 +61,7 @@ import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.PushBackRecordSet; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; @@ -79,6 +80,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -420,22 +423,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session try (final InputStream inStream = session.read(input); final RecordReader reader = readerFactory.createRecordReader(input, inStream, getLogger())) { final PushBackRecordSet recordSet = new PushBackRecordSet(reader.createRecordSet()); + final RecordSchema recordSchema = reader.getSchema(); final List operationList = new ArrayList<>(); - final List originals = new ArrayList<>(); + final List processedRecords = new ArrayList<>(); + final List originalRecords = new ArrayList<>(); Record record; while ((record = recordSet.next()) != null) { - addOperation(operationList, record, indexOperationParameters, indices, types); - originals.add(record); + final Record originalRecord = cloneRecord(record); + final Record processedRecord = cloneRecord(record); + + addOperation(operationList, processedRecord, indexOperationParameters, indices, types); + processedRecords.add(processedRecord); + originalRecords.add(originalRecord); if (operationList.size() == indexOperationParameters.getBatchSize() || !recordSet.isAnotherRecord()) { - operate(operationList, originals, reader, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords, batches); + operate(operationList, processedRecords, originalRecords, recordSchema, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords); batches++; } } if (!operationList.isEmpty()) { - operate(operationList, originals, reader, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords, batches); + operate(operationList, processedRecords, originalRecords, recordSchema, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords); batches++; } } catch (final ElasticsearchException ese) { @@ -508,20 +517,21 @@ private void addOperation(final List operationList, final operationList.add(new IndexOperationRequest(index, type, id, contentMap, indexOp, script, scriptedUpsert, dynamicTemplates, bulkHeaderFields)); } - private void operate(final List operationList, final List originals, final RecordReader reader, - final ProcessSession session, final FlowFile input, final IndexOperationParameters indexOperationParameters, - final List resultRecords, final AtomicLong erroredRecords, final AtomicLong successfulRecords, final int batch) + private void operate(final List operationList, final List processedRecords, final List originalRecords, + final RecordSchema recordSchema, final ProcessSession session, final FlowFile input, final IndexOperationParameters indexOperationParameters, + final List resultRecords, final AtomicLong erroredRecords, final AtomicLong successfulRecords) throws IOException, SchemaNotFoundException, MalformedRecordException { - final BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema()); - final ResponseDetails responseDetails = indexDocuments(bundle, session, input, indexOperationParameters, batch); + final BulkOperation bundle = new BulkOperation(operationList, processedRecords, recordSchema); + final ResponseDetails responseDetails = indexDocuments(bundle, originalRecords, session, input, indexOperationParameters); successfulRecords.getAndAdd(responseDetails.successCount()); erroredRecords.getAndAdd(responseDetails.errorCount()); resultRecords.addAll(responseDetails.outputs().values().stream().map(Output::getFlowFile).toList()); operationList.clear(); - originals.clear(); + processedRecords.clear(); + originalRecords.clear(); } private void removeResultRecordFlowFiles(final List results, final ProcessSession session) { @@ -532,8 +542,8 @@ private void removeResultRecordFlowFiles(final List results, final Pro results.clear(); } - private ResponseDetails indexDocuments(final BulkOperation bundle, final ProcessSession session, final FlowFile input, - final IndexOperationParameters indexOperationParameters, final int batch) + private ResponseDetails indexDocuments(final BulkOperation bundle, final List originalRecords, final ProcessSession session, final FlowFile input, + final IndexOperationParameters indexOperationParameters) throws IOException, SchemaNotFoundException, MalformedRecordException { final IndexOperationResponse response = clientService.get().bulk(bundle.getOperationList(), indexOperationParameters.getElasticsearchRequestOptions()); @@ -546,16 +556,7 @@ private ResponseDetails indexDocuments(final BulkOperation bundle, final Process final int numSuccessful = response.getItems() == null ? 0 : response.getItems().size() - numErrors; final Map outputs = new HashMap<>(); - try (final InputStream inStream = session.read(input); - final RecordReader inputReader = readerFactory.createRecordReader(input, inStream, getLogger())) { - - // if there are errors present, skip through the input FlowFile to the current batch of records - if (numErrors > 0) { - for (int r = 0; r < batch * indexOperationParameters.getBatchSize(); r++) { - inputReader.nextRecord(); - } - } - + try { for (int o = 0; o < bundle.getOriginalRecords().size(); o++) { final String type; final Relationship relationship; @@ -574,7 +575,7 @@ private ResponseDetails indexDocuments(final BulkOperation bundle, final Process } else { type = OUTPUT_TYPE_ERROR; } - outputRecord = inputReader.nextRecord(); + outputRecord = originalRecords.get(o); recordSchema = outputRecord.getSchema(); } else { relationship = REL_SUCCESSFUL; @@ -582,10 +583,6 @@ private ResponseDetails indexDocuments(final BulkOperation bundle, final Process type = OUTPUT_TYPE_SUCCESS; outputRecord = bundle.getOriginalRecords().get(o); recordSchema = bundle.getSchema(); - // skip the associated Input Record for this successful Record - if (numErrors > 0) { - inputReader.nextRecord(); - } } final Output output = getOutputByType(outputs, type, session, relationship, input, recordSchema); output.write(outputRecord, error); @@ -594,7 +591,7 @@ private ResponseDetails indexDocuments(final BulkOperation bundle, final Process for (final Output output : outputs.values()) { output.transfer(session); } - } catch (final IOException | SchemaNotFoundException | MalformedRecordException ex) { + } catch (final IOException | SchemaNotFoundException ex) { getLogger().error("Unable to write error/successful records", ex); outputs.values().forEach(o -> { try { @@ -773,6 +770,47 @@ private Object coerceStringToLong(final String fieldName, final String stringVal : stringValue; } + private Record cloneRecord(final Record record) { + final Map valuesCopy = cloneValues(record.toMap()); + return new MapRecord(record.getSchema(), valuesCopy, record.isTypeChecked(), record.isDropUnknownFields()); + } + + private Map cloneValues(final Map values) { + final Map cloned = new LinkedHashMap<>(values.size()); + for (final Map.Entry entry : values.entrySet()) { + cloned.put(entry.getKey(), cloneValue(entry.getValue())); + } + return cloned; + } + + private Object cloneValue(final Object value) { + if (value instanceof Record recordValue) { + return cloneRecord(recordValue); + } + if (value instanceof Map mapValue) { + final Map clonedMap = new LinkedHashMap<>(mapValue.size()); + for (final Map.Entry entry : mapValue.entrySet()) { + clonedMap.put(entry.getKey(), cloneValue(entry.getValue())); + } + return clonedMap; + } + if (value instanceof Collection collectionValue) { + final List clonedList = new ArrayList<>(collectionValue.size()); + for (final Object element : collectionValue) { + clonedList.add(cloneValue(element)); + } + return clonedList; + } + if (value instanceof Object[] arrayValue) { + final Object[] clonedArray = new Object[arrayValue.length]; + for (int i = 0; i < arrayValue.length; i++) { + clonedArray[i] = cloneValue(arrayValue[i]); + } + return clonedArray; + } + return value; + } + private class Output { private FlowFile flowFile; private final RecordSetWriter writer; diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java index 4eab3b3f119f..e1e90064bcbd 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/PutElasticsearchRecord_IT.java @@ -28,6 +28,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -141,6 +144,30 @@ void testErrorRecordsOutputMultipleBatches() { assertTrue(error2Content.contains("\"foo\":456"), error2Content); } + @Test + void testMultipleBatchesWithoutRecursiveReads() { + final String json = generateRecordsJson(2000); + + runner.setProperty(ElasticsearchRestProcessor.INDEX, "test-multiple-batches-no-recursive"); + runner.setProperty(PutElasticsearchRecord.BATCH_SIZE, "100"); + runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id"); + runner.setAllowRecursiveReads(false); + + runner.enqueue(json); + runner.run(); + + runner.assertTransferCount(ElasticsearchRestProcessor.REL_FAILURE, 0); + runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1); + runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0); + + final List successful = runner.getFlowFilesForRelationship(AbstractPutElasticsearch.REL_SUCCESSFUL); + assertEquals(20, successful.size()); + final int totalRecordCount = successful.stream() + .mapToInt(flowFile -> Integer.parseInt(flowFile.getAttribute("record.count"))) + .sum(); + assertEquals(2000, totalRecordCount); + } + @Test void testUpdateError() { final String json = """ @@ -200,4 +227,17 @@ void testScriptedUpsertError() { assertTrue(errorContent.contains("\"id\":\"123\""), errorContent); assertTrue(errorContent.contains("\"script\":{"), errorContent); } + + private String generateRecordsJson(final int recordCount) { + final StringBuilder builder = new StringBuilder(recordCount * 32); + builder.append('['); + for (int i = 0; i < recordCount; i++) { + builder.append("{\"id\":\"").append(i).append("\",\"value\":").append(i).append('}'); + if (i < recordCount - 1) { + builder.append(','); + } + } + builder.append(']'); + return builder.toString(); + } } diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml index dad1fd152420..b2ab74b62c5a 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml @@ -108,7 +108,7 @@ language governing permissions and limitations under the License. --> elasticsearch8 - 8.18.2 + 8.19.4