From be23edb74af6ca87a2ecc2ef1c74fe8f8ba476ba Mon Sep 17 00:00:00 2001 From: David Kjerrumgaard Date: Thu, 7 Mar 2019 18:48:14 -0800 Subject: [PATCH] Added logic to handle non-exlusive subscriptions --- .../AbstractPulsarConsumerProcessor.java | 2 +- .../pulsar/pubsub/ConsumePulsar.java | 19 ++++++++++++++++--- .../pulsar/pubsub/TestConsumePulsar.java | 3 +++ .../pubsub/sync/TestSyncConsumePulsar.java | 4 ++++ 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java index faaa9c4..7eeae24 100644 --- a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java +++ b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java @@ -57,7 +57,7 @@ public abstract class AbstractPulsarConsumerProcessor extends AbstractProcessor { - static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); + protected static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " + "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages."); diff --git a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar.java b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar.java index fcfff62..c6bd48e 100644 --- a/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar.java +++ b/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar.java @@ -125,12 +125,17 @@ public Object call() throws Exception { } private void consume(Consumer consumer, ProcessContext context, ProcessSession session) throws PulsarClientException { + try { final int maxMessages = context.getProperty(CONSUMER_BATCH_SIZE).isSet() ? context.getProperty(CONSUMER_BATCH_SIZE) .evaluateAttributeExpressions().asInteger() : Integer.MAX_VALUE; final byte[] demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR) .evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8) : null; + + // Cumulative acks are only permitted on Exclusive subscriptions. + final boolean exclusive = context.getProperty(SUBSCRIPTION_TYPE).getValue() + .equalsIgnoreCase(EXCLUSIVE.getValue()); FlowFile flowFile = session.create(); OutputStream out = session.write(flowFile); @@ -141,9 +146,12 @@ private void consume(Consumer consumer, ProcessContext context, ProcessS while (((msg = consumer.receive(0, TimeUnit.SECONDS)) != null) && loopCounter.get() < maxMessages) { try { - lastMsg = msg; loopCounter.incrementAndGet(); + + if (!exclusive) { + consumer.acknowledge(msg); + } // Skip empty messages, as they cause NPE's when we write them to the OutputStream if (msg.getValue() == null || msg.getValue().length < 1) { @@ -154,14 +162,18 @@ private void consume(Consumer consumer, ProcessContext context, ProcessS msgCount.getAndIncrement(); } catch (final IOException ioEx) { + getLogger().error("Unable to create flow file ", ioEx); session.rollback(); + if (exclusive) { + consumer.acknowledgeCumulative(lastMsg); + } return; } } - + IOUtils.closeQuietly(out); - if (lastMsg != null) { + if (exclusive && lastMsg != null) { consumer.acknowledgeCumulative(lastMsg); } @@ -177,6 +189,7 @@ private void consume(Consumer consumer, ProcessContext context, ProcessS } } catch (PulsarClientException e) { + getLogger().error("Error communicating with Apache Pulsar", e); context.yield(); session.rollback(); } diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsar.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsar.java index daea5ba..cad59b3 100644 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsar.java +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsar.java @@ -84,6 +84,7 @@ public void onStoppedTest() throws NoSuchMethodException, SecurityException, Pul runner.setProperty(ConsumePulsar.TOPICS, "foo"); runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, "bar"); + runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive"); runner.run(10, true); runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS); @@ -109,6 +110,7 @@ protected void batchMessages(String msg, String topic, String sub, boolean async runner.setProperty(ConsumePulsar.TOPICS, topic); runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, sub); runner.setProperty(ConsumePulsar.CONSUMER_BATCH_SIZE, batchSize + ""); + runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive"); runner.setProperty(ConsumePulsar.MESSAGE_DEMARCATOR, "\n"); runner.run(1, true); @@ -146,6 +148,7 @@ protected void sendMessages(String msg, String topic, String sub, boolean async, runner.setProperty(ConsumePulsar.TOPICS, topic); runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, sub); runner.setProperty(ConsumePulsar.CONSUMER_BATCH_SIZE, 1 + ""); + runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive"); runner.run(iterations, true); runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS); diff --git a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsar.java b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsar.java index df7c0c8..14e49a2 100644 --- a/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsar.java +++ b/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/sync/TestSyncConsumePulsar.java @@ -41,6 +41,7 @@ public void nullMessageTest() throws PulsarClientException { runner.setProperty(ConsumePulsar.TOPICS, "foo"); runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, "bar"); + runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive"); runner.run(); runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS); @@ -57,6 +58,7 @@ public void pulsarClientExceptionTest() throws PulsarClientException { runner.setProperty(ConsumePulsar.TOPICS, "foo"); runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, "bar"); + runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive"); runner.run(); runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS); @@ -75,6 +77,7 @@ public void emptyMessageTest() throws PulsarClientException { runner.setProperty(ConsumePulsar.TOPICS, "foo"); runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, "bar"); + runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive"); runner.run(); runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS); @@ -111,6 +114,7 @@ public void onStoppedTest() throws PulsarClientException { runner.setProperty(ConsumePulsar.TOPICS, "foo"); runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, "bar"); runner.setProperty(ConsumePulsar.CONSUMER_BATCH_SIZE, 1 + ""); + runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive"); runner.run(1, true); runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS);