Skip to content

Conversation

pvillard31
Copy link
Contributor

Summary

NIFI-15001 - Fix PutElasticsearchRecord with recursive reads

When PutElasticsearchRecord processed FlowFiles with batch sizes smaller than the record count, it re-read the FlowFile to build error batches. With recursive reads disabled (the default), the second session.read() triggered IllegalStateException: FlowFile … already in use, so entire FlowFiles were routed to failure.

The change keeps processing single-pass: every record is cloned exactly once when read from the RecordReader. The mutated copy is used for the Elasticsearch _bulk request, while an untouched clone is retained for provenance and error routing. This eliminates the recursive read and keeps RecordPath mutations from leaking into error output.

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

Comment on lines +773 to +786
private Record cloneRecord(final Record record) {
final Map<String, Object> valuesCopy = cloneValues(record.toMap());
return new MapRecord(record.getSchema(), valuesCopy, record.isTypeChecked(), record.isDropUnknownFields());
}

private Map<String, Object> cloneValues(final Map<String, Object> values) {
final Map<String, Object> cloned = new LinkedHashMap<>(values.size());
for (final Map.Entry<String, Object> entry : values.entrySet()) {
cloned.put(entry.getKey(), cloneValue(entry.getValue()));
}
return cloned;
}

private Object cloneValue(final Object value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating all these clone methods, it would be cleaner to use something like Apache Commons SerializationUtils#clone. The only thing you would have to make sure though is all of the data to be cloned implements the Serializable interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our case the records are MapRecord instances, and MapRecord (and other Record implementations) do not implement Serializable. Besides, Java serialization would add noticeable overhead on a hot path that can process large batches, whereas the current approach only walk the map/collection structures we actually store.

Copy link
Contributor

@dan-s1 dan-s1 Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying that. Could this cloning be reused? In other words would it make sense to have a clone/deepClone method in the Record interface and implemented in the implementations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it could, but I'm not sure this is really a pattern we would want to encourage. The scenario here is quite specific. Others may have a different opinion though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants