Skip to content

Commit

Permalink
NIFI-7085 add flowFile batching to ConsumeJMS and PublishJMS
Browse files Browse the repository at this point in the history
This closes apache#8584

load some configuration in onScheduled instead of a loop, reinstated and fixed a failing integration test
  • Loading branch information
mosermw authored and joewitt committed Oct 18, 2024
1 parent 1bcc61e commit 80e8893
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ public abstract class AbstractJMSProcessor<T extends JMSWorker> extends Abstract
.identifiesControllerService(JMSConnectionFactoryProviderDefinition.class)
.build();

static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Maximum Batch Size")
.description("The maximum number of messages to publish or consume in each invocation of the processor.")
.required(true)
.defaultValue("1")
.addValidator(StandardValidators.createLongValidator(1, 10_000, true))
.build();

static final List<PropertyDescriptor> JNDI_JMS_CF_PROPERTIES = Collections.unmodifiableList(
JndiJmsConnectionFactoryProperties.getPropertyDescriptors().stream()
.map(pd -> new PropertyDescriptor.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriterCallback;
import org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy;
import org.apache.nifi.jms.processors.ioconcept.writer.record.RecordWriter;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
Expand Down Expand Up @@ -251,6 +252,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
_propertyDescriptors.add(SHARED_SUBSCRIBER);
_propertyDescriptors.add(SUBSCRIPTION_NAME);
_propertyDescriptors.add(TIMEOUT);
_propertyDescriptors.add(MAX_BATCH_SIZE);
_propertyDescriptors.add(ERROR_QUEUE);

_propertyDescriptors.add(RECORD_READER);
Expand All @@ -268,6 +270,17 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
relationships = Collections.unmodifiableSet(_relationships);
}

@Override
public void migrateProperties(PropertyConfiguration config) {
super.migrateProperties(config);

if (!config.hasProperty(MAX_BATCH_SIZE)) {
if (config.isPropertySet(BASE_RECORD_READER)) {
config.setProperty(MAX_BATCH_SIZE, "10000");
}
}
}

private static boolean isDurableSubscriber(final ProcessContext context) {
final Boolean durableBoolean = context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
return durableBoolean == null ? false : durableBoolean;
Expand Down Expand Up @@ -322,9 +335,9 @@ protected void rendezvousWithJms(final ProcessContext context, final ProcessSess

try {
if (context.getProperty(RECORD_READER).isSet()) {
processMessageSet(context, processSession, consumer, destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset);
processMessagesAsRecords(context, processSession, consumer, destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset);
} else {
processSingleMessage(processSession, consumer, destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset);
processMessages(context, processSession, consumer, destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset);
}
} catch (Exception e) {
getLogger().error("Error while trying to process JMS message", e);
Expand All @@ -334,26 +347,25 @@ protected void rendezvousWithJms(final ProcessContext context, final ProcessSess
}
}

private void processSingleMessage(ProcessSession processSession, JMSConsumer consumer, String destinationName, String errorQueueName,
boolean durable, boolean shared, String subscriptionName, String messageSelector, String charset) {

consumer.consumeSingleMessage(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, response -> {
if (response == null) {
return;
}

try {
final FlowFile flowFile = createFlowFileFromMessage(processSession, destinationName, response);

processSession.getProvenanceReporter().receive(flowFile, destinationName);
processSession.transfer(flowFile, REL_SUCCESS);
processSession.commitAsync(
() -> withLog(() -> acknowledge(response)),
__ -> withLog(() -> response.reject()));
} catch (final Throwable t) {
response.reject();
throw t;
}
private void processMessages(ProcessContext context, ProcessSession processSession, JMSConsumer consumer, String destinationName, String errorQueueName,
boolean durable, boolean shared, String subscriptionName, String messageSelector, String charset) {

int batchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
consumer.consumeMessageSet(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, batchSize, jmsResponses -> {
jmsResponses.forEach(response -> {
try {
final FlowFile flowFile = createFlowFileFromMessage(processSession, destinationName, response);

processSession.getProvenanceReporter().receive(flowFile, destinationName);
processSession.transfer(flowFile, REL_SUCCESS);
processSession.commitAsync(
() -> withLog(() -> acknowledge(response)),
__ -> withLog(() -> response.reject()));
} catch (final Throwable t) {
response.reject();
throw t;
}
});
});
}

Expand All @@ -369,9 +381,10 @@ private FlowFile createFlowFileFromMessage(ProcessSession processSession, String
return processSession.putAllAttributes(flowFile, attributes);
}

private void processMessageSet(ProcessContext context, ProcessSession session, JMSConsumer consumer, String destinationName, String errorQueueName,
boolean durable, boolean shared, String subscriptionName, String messageSelector, String charset) {
private void processMessagesAsRecords(ProcessContext context, ProcessSession session, JMSConsumer consumer, String destinationName, String errorQueueName,
boolean durable, boolean shared, String subscriptionName, String messageSelector, String charset) {

int batchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final OutputStrategy outputStrategy = OutputStrategy.valueOf(context.getProperty(OUTPUT_STRATEGY).getValue());
Expand All @@ -385,7 +398,7 @@ private void processMessageSet(ProcessContext context, ProcessSession session, J
getLogger()
);

consumer.consumeMessageSet(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, jmsResponses -> {
consumer.consumeMessageSet(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, batchSize, jmsResponses -> {
flowFileWriter.write(session, jmsResponses, new FlowFileWriterCallback<>() {
@Override
public void onSuccess(FlowFile flowFile, List<JMSResponse> processedMessages, List<JMSResponse> failedMessages) {
Expand Down Expand Up @@ -477,7 +490,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
* Use provided clientId for non shared durable consumers, if not set
* always a different value as defined in {@link AbstractJMSProcessor#setClientId(ProcessContext, SingleConnectionFactory)}.
* </p>
* See {@link Session#createDurableConsumer(javax.jms.Topic, String, String, boolean)},
* See {@link Session#createDurableConsumer(jakarta.jms.Topic, String, String, boolean)},
* in special following part: <i>An unshared durable subscription is
* identified by a name specified by the client and by the client identifier,
* which must be set. An application which subsequently wishes to create
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@
*/
class JMSConsumer extends JMSWorker {

private final static int MAX_MESSAGES_PER_FLOW_FILE = 10000;

JMSConsumer(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog logger) {
super(connectionFactory, jmsTemplate, logger);
logger.debug("Created Message Consumer for '{}'", jmsTemplate);
Expand Down Expand Up @@ -88,37 +86,19 @@ private MessageConsumer createMessageConsumer(final Session session, final Strin
}
}

/**
* Receives a message from the broker. It is the consumerCallback's responsibility to acknowledge the received message.
*/
public void consumeSingleMessage(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector,
final String charset, final Consumer<JMSResponse> singleMessageConsumer) {
doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, (session, messageConsumer) -> {
final JMSResponse response = receiveMessage(session, messageConsumer, charset, errorQueueName);
if (response != null) {
// Provide the JMSResponse to the processor to handle. It is the responsibility of the
// processor to handle acknowledgment of the message (if Client Acknowledge), and it is
// the responsibility of the processor to handle closing the Message Consumer.
// Both of these actions can be handled by calling the acknowledge() or reject() methods of
// the JMSResponse.
singleMessageConsumer.accept(response);
}
});
}

/**
* Receives a list of messages from the broker. It is the consumerCallback's responsibility to acknowledge the received message.
*/
public void consumeMessageSet(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector,
final String charset, final Consumer<List<JMSResponse>> messageSetConsumer) {
final String charset, final int batchSize, final Consumer<List<JMSResponse>> messageSetConsumer) {
doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, new MessageReceiver() {
@Override
public void consume(Session session, MessageConsumer messageConsumer) throws JMSException {
final List<JMSResponse> jmsResponses = new ArrayList<>();
int batchCounter = 0;

JMSResponse response;
while ((response = receiveMessage(session, messageConsumer, charset, errorQueueName)) != null && batchCounter < MAX_MESSAGES_PER_FLOW_FILE) {
while (batchCounter < batchSize && (response = receiveMessage(session, messageConsumer, charset, errorQueueName)) != null) {
response.setBatchOrder(batchCounter);
jmsResponses.add(response);
batchCounter++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private static Destination buildDestination(final Session session, final String
}

/**
* Implementations of this interface use {@link javax.jms.Message} methods to set strongly typed properties.
* Implementations of this interface use {@link jakarta.jms.Message} methods to set strongly typed properties.
*/
public interface JmsPropertySetter {
void setProperty(final Message message, final String name, final String value) throws JMSException, NumberFormatException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.expression.ExpressionLanguageScope;
Expand Down Expand Up @@ -183,6 +184,7 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
_propertyDescriptors.add(CHARSET);
_propertyDescriptors.add(ALLOW_ILLEGAL_HEADER_CHARS);
_propertyDescriptors.add(ATTRIBUTES_AS_HEADERS_REGEX);
_propertyDescriptors.add(MAX_BATCH_SIZE);

_propertyDescriptors.add(RECORD_READER);
_propertyDescriptors.add(RECORD_WRITER);
Expand All @@ -198,6 +200,22 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
relationships = Collections.unmodifiableSet(_relationships);
}

volatile Boolean allowIllegalChars;
volatile Pattern attributeHeaderPattern;
volatile RecordReaderFactory readerFactory;
volatile RecordSetWriterFactory writerFactory;

@OnScheduled
public void onScheduled(final ProcessContext context) {
allowIllegalChars = context.getProperty(ALLOW_ILLEGAL_HEADER_CHARS).asBoolean();

final String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue();
attributeHeaderPattern = Pattern.compile(attributeHeaderRegex);

readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
}

/**
* Will construct JMS {@link Message} by extracting its body from the
* incoming {@link FlowFile}. {@link FlowFile} attributes that represent
Expand All @@ -211,30 +229,28 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
*/
@Override
protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession, JMSPublisher publisher) throws ProcessException {
FlowFile flowFile = processSession.get();
if (flowFile != null) {
final List<FlowFile> flowFiles = processSession.get(context.getProperty(MAX_BATCH_SIZE).asInteger());
if (flowFiles.isEmpty()) {
return;
}

flowFiles.forEach(flowFile -> {
try {
final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
final Boolean allowIllegalChars = context.getProperty(ALLOW_ILLEGAL_HEADER_CHARS).asBoolean();
final String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue();

final Map<String, String> attributesToSend = new HashMap<>();
// REGEX Attributes
final Pattern pattern = Pattern.compile(attributeHeaderRegex);
for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) {
final String key = entry.getKey();
if (pattern.matcher(key).matches()) {
if (attributeHeaderPattern.matcher(key).matches()) {
if (allowIllegalChars || key.endsWith(".type") || (!key.contains("-") && !key.contains("."))) {
attributesToSend.put(key, flowFile.getAttribute(key));
}
}
}

if (context.getProperty(RECORD_READER).isSet()) {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);

final FlowFileReader flowFileReader = new StateTrackingFlowFileReader(
getIdentifier(),
new RecordSupplier(readerFactory, writerFactory),
Expand Down Expand Up @@ -278,7 +294,7 @@ public void onFailure(FlowFile flowFile, int processedRecords, long transmission
} catch (Exception e) {
handleException(context, processSession, publisher, flowFile, e);
}
}
});
}

private void handleException(ProcessContext context, ProcessSession processSession, JMSPublisher publisher, FlowFile flowFile, Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ public void testConsumeRecords() throws InitializationException {
TestRunner testRunner = initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
testRunner.setProperty(ConsumeJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(AbstractJMSProcessor.MAX_BATCH_SIZE, "10");

testRunner.run(1, false);

Expand Down Expand Up @@ -535,6 +536,7 @@ public void testConsumeMalformedRecords() throws InitializationException {
TestRunner testRunner = initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
testRunner.setProperty(ConsumeJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(AbstractJMSProcessor.MAX_BATCH_SIZE, "10");
testRunner.setRelationshipAvailable(ConsumeJMS.REL_PARSE_FAILURE);

testRunner.run(1, false);
Expand Down Expand Up @@ -568,6 +570,7 @@ public void testConsumeRecordsWithAppenderOutputStrategy() throws Initialization
TestRunner testRunner = initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
testRunner.setProperty(ConsumeJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(AbstractJMSProcessor.MAX_BATCH_SIZE, "10");
testRunner.setProperty(ConsumeJMS.OUTPUT_STRATEGY, OutputStrategy.USE_APPENDER.getValue());

testRunner.run(1, false);
Expand Down Expand Up @@ -609,6 +612,7 @@ public void testConsumeRecordsWithWrapperOutputStrategy() throws InitializationE
TestRunner testRunner = initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
testRunner.setProperty(ConsumeJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(AbstractJMSProcessor.MAX_BATCH_SIZE, "10");
testRunner.setProperty(ConsumeJMS.OUTPUT_STRATEGY, OutputStrategy.USE_WRAPPER.getValue());

testRunner.run(1, false);
Expand Down
Loading

0 comments on commit 80e8893

Please sign in to comment.