Skip to content

Commit

Permalink
Added logic to handle non-exlusive subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
David Kjerrumgaard committed Mar 8, 2019
1 parent 7c2afa7 commit be23edb
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@

public abstract class AbstractPulsarConsumerProcessor<T> extends AbstractProcessor {

static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name");
protected static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name");
static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages");
static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer "
+ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,17 @@ public Object call() throws Exception {
}

private void consume(Consumer<byte[]> consumer, ProcessContext context, ProcessSession session) throws PulsarClientException {

try {
final int maxMessages = context.getProperty(CONSUMER_BATCH_SIZE).isSet() ? context.getProperty(CONSUMER_BATCH_SIZE)
.evaluateAttributeExpressions().asInteger() : Integer.MAX_VALUE;

final byte[] demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR)
.evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8) : null;

// Cumulative acks are only permitted on Exclusive subscriptions.
final boolean exclusive = context.getProperty(SUBSCRIPTION_TYPE).getValue()
.equalsIgnoreCase(EXCLUSIVE.getValue());

FlowFile flowFile = session.create();
OutputStream out = session.write(flowFile);
Expand All @@ -141,9 +146,12 @@ private void consume(Consumer<byte[]> consumer, ProcessContext context, ProcessS

while (((msg = consumer.receive(0, TimeUnit.SECONDS)) != null) && loopCounter.get() < maxMessages) {
try {

lastMsg = msg;
loopCounter.incrementAndGet();

if (!exclusive) {
consumer.acknowledge(msg);
}

// Skip empty messages, as they cause NPE's when we write them to the OutputStream
if (msg.getValue() == null || msg.getValue().length < 1) {
Expand All @@ -154,14 +162,18 @@ private void consume(Consumer<byte[]> consumer, ProcessContext context, ProcessS
msgCount.getAndIncrement();

} catch (final IOException ioEx) {
getLogger().error("Unable to create flow file ", ioEx);
session.rollback();
if (exclusive) {
consumer.acknowledgeCumulative(lastMsg);
}
return;
}
}

IOUtils.closeQuietly(out);

if (lastMsg != null) {
if (exclusive && lastMsg != null) {
consumer.acknowledgeCumulative(lastMsg);
}

Expand All @@ -177,6 +189,7 @@ private void consume(Consumer<byte[]> consumer, ProcessContext context, ProcessS
}

} catch (PulsarClientException e) {
getLogger().error("Error communicating with Apache Pulsar", e);
context.yield();
session.rollback();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void onStoppedTest() throws NoSuchMethodException, SecurityException, Pul

runner.setProperty(ConsumePulsar.TOPICS, "foo");
runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, "bar");
runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive");
runner.run(10, true);
runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS);

Expand All @@ -109,6 +110,7 @@ protected void batchMessages(String msg, String topic, String sub, boolean async
runner.setProperty(ConsumePulsar.TOPICS, topic);
runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, sub);
runner.setProperty(ConsumePulsar.CONSUMER_BATCH_SIZE, batchSize + "");
runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive");
runner.setProperty(ConsumePulsar.MESSAGE_DEMARCATOR, "\n");
runner.run(1, true);

Expand Down Expand Up @@ -146,6 +148,7 @@ protected void sendMessages(String msg, String topic, String sub, boolean async,
runner.setProperty(ConsumePulsar.TOPICS, topic);
runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, sub);
runner.setProperty(ConsumePulsar.CONSUMER_BATCH_SIZE, 1 + "");
runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive");
runner.run(iterations, true);

runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public void nullMessageTest() throws PulsarClientException {

runner.setProperty(ConsumePulsar.TOPICS, "foo");
runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, "bar");
runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive");
runner.run();
runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS);

Expand All @@ -57,6 +58,7 @@ public void pulsarClientExceptionTest() throws PulsarClientException {

runner.setProperty(ConsumePulsar.TOPICS, "foo");
runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, "bar");
runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive");
runner.run();
runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS);

Expand All @@ -75,6 +77,7 @@ public void emptyMessageTest() throws PulsarClientException {

runner.setProperty(ConsumePulsar.TOPICS, "foo");
runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, "bar");
runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive");
runner.run();
runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS);

Expand Down Expand Up @@ -111,6 +114,7 @@ public void onStoppedTest() throws PulsarClientException {
runner.setProperty(ConsumePulsar.TOPICS, "foo");
runner.setProperty(ConsumePulsar.SUBSCRIPTION_NAME, "bar");
runner.setProperty(ConsumePulsar.CONSUMER_BATCH_SIZE, 1 + "");
runner.setProperty(ConsumePulsar.SUBSCRIPTION_TYPE, "Exclusive");
runner.run(1, true);
runner.assertAllFlowFilesTransferred(ConsumePulsar.REL_SUCCESS);

Expand Down

0 comments on commit be23edb

Please sign in to comment.