Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish with confirm listener added #56

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>io.appform.dropwizard.actors</groupId>
<artifactId>dropwizard-rabbitmq-actors</artifactId>
<version>2.0.28-1</version>
<version>3.0.0_SNAPSHOT</version>
<name>Dropwizard RabbitMQ Bundle</name>
<url>https://github.com/santanusinha/dropwizard-rabbitmq-actors</url>
<description>Provides actor abstraction on RabbitMQ for dropwizard based projects.</description>
Expand Down Expand Up @@ -92,6 +92,7 @@
<guava.version>31.0.1-jre</guava.version>
<test.container.version>1.0.6</test.container.version>
<okhttp.version>4.9.3</okhttp.version>
<mockito.version>4.4.0</mockito.version>
<amqp-client.version>5.14.1</amqp-client.version>
</properties>

Expand All @@ -112,6 +113,12 @@
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-core</artifactId>
Expand Down Expand Up @@ -186,7 +193,7 @@
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<artifactId>junit-dep</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand All @@ -200,6 +207,11 @@
<artifactId>junit-vintage-engine</artifactId>
<version>5.8.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import io.appform.dropwizard.actors.connectivity.strategy.SharedConnectionStrategy;
import io.appform.dropwizard.actors.exceptionhandler.ExceptionHandlingFactory;
import io.appform.dropwizard.actors.retry.RetryStrategyFactory;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.validation.constraints.NotNull;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
Expand Down Expand Up @@ -124,6 +127,11 @@ public final void publish(Message message, AMQP.BasicProperties properties) thro
publishActor().publish(message, properties);
}

public final List<Message> publishWithConfirmListener(List<Message> messages, AMQP.BasicProperties properties,
long timeout, @NotNull TimeUnit unit) throws Exception {
return publishActor().publishWithConfirmListener(messages, properties, timeout, unit);
}

public final long pendingMessagesCount() {
return publishActor().pendingMessagesCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@
import io.appform.dropwizard.actors.actor.DelayType;
import io.appform.dropwizard.actors.base.utils.NamingUtils;
import io.appform.dropwizard.actors.connectivity.RMQConnection;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;

Expand All @@ -24,7 +32,7 @@ public class UnmanagedPublisher<Message> {
private final ObjectMapper mapper;
private final String queueName;

private Channel publishChannel;
public Channel publishChannel;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted it, created a function to set publish channel


public UnmanagedPublisher(
String name,
Expand Down Expand Up @@ -106,17 +114,105 @@ public final long pendingSidelineMessagesCount() {
return Long.MAX_VALUE;
}

/**
* Note: Timeout is in MilliSeconds, Function take a list of message as input and return not ack msg as output
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are taking timeunit, so maybe this comment needs to change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* @param messages : Messages to be published
* @param properties
* @param timeout : in MS timeout for waiting on countDownLatch
* @param unit : timeout unit
* @return : List of message nacked
* @throws Exception
*/
public List<Message> publishWithConfirmListener(List<Message> messages, AMQP.BasicProperties properties,
long timeout, @NotNull TimeUnit unit) throws Exception {
publishChannel.confirmSelect();
ConcurrentNavigableMap<Long, Message> outstandingConfirms = new ConcurrentSkipListMap<>();
List<Message> nackedMessages = new ArrayList<>();
CountDownLatch publishAckLatch = new CountDownLatch(messages.size());

publishChannel.addConfirmListener((sequenceNumber, multiple) -> {
messagesAck(sequenceNumber, multiple, outstandingConfirms, publishAckLatch);
}, (sequenceNumber, multiple) -> {
nackedMessages.addAll(messagesNack(sequenceNumber, multiple, outstandingConfirms, publishAckLatch));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just assign the return value rather than this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there can be multiple callback with multiple = true

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, my bad

});

String routingKey = NamingUtils.getRoutingKey(queueName, config);

long startTime = System.nanoTime();

for (Message message : messages) {
try {
outstandingConfirms.put(publishChannel.getNextPublishSeqNo(), message);
publishChannel.basicPublish(config.getExchange(), routingKey, properties,
mapper().writeValueAsBytes(message));
} catch (Exception e) {
log.error(String.format("Failed to publish Message : %s with exception %s", message, e));
publishAckLatch.countDown();
}
}

if (!publishAckLatch.await(unit.toMillis(timeout), TimeUnit.MILLISECONDS)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is wrong with publishAckLatch.await(timeout, unit)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

log.error("Timed out waiting for publish acks");
}

long endTime = System.nanoTime();

log.info(String.format("Published %d messages with confirmListener in %d ms", messages.size() - outstandingConfirms.size(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't know how many messages were submitted, i.e. total messages.

Duration.ofNanos(startTime - endTime).toMillis()));
nackedMessages.addAll(outstandingConfirms.values());
return nackedMessages;
}


private void messagesAck(long sequenceNumber, boolean multiple, ConcurrentNavigableMap<Long, Message> outstandingConfirms, CountDownLatch publishAckLatch)
{
if (multiple) {
ConcurrentNavigableMap<Long, Message> confirmed = outstandingConfirms.headMap(
sequenceNumber, true
);
for(int i =0;i<confirmed.size();i++)
publishAckLatch.countDown();
confirmed.clear();
} else {
publishAckLatch.countDown();
outstandingConfirms.remove(sequenceNumber);
}
}

private List<Message> messagesNack(long sequenceNumber, boolean multiple, ConcurrentNavigableMap<Long, Message> outstandingConfirms, CountDownLatch publishAckLatch)
{
List<Message> nackedMessages = new ArrayList<>();
if(multiple == true)
{
ConcurrentNavigableMap<Long, Message> nacked = outstandingConfirms.headMap(
sequenceNumber, true
);
for(int i =0;i<nacked.size();i++)
publishAckLatch.countDown();
nackedMessages.addAll(nacked.values());
nacked.clear();
}
else
{
publishAckLatch.countDown();
nackedMessages.add(outstandingConfirms.get(sequenceNumber));
outstandingConfirms.remove(sequenceNumber);
}
return nackedMessages;
}


public void start() throws Exception {
final String exchange = config.getExchange();
final String dlx = config.getExchange() + "_SIDELINE";
this.publishChannel = connection.newChannel();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WHy is this change done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverting this change

if (config.isDelayed()) {
ensureDelayedExchange(exchange);
} else {
ensureExchange(exchange);
}
ensureExchange(dlx);

this.publishChannel = connection.newChannel();
connection.ensure(queueName + "_SIDELINE", queueName, dlx,
connection.rmqOpts(config));
if (config.isSharded()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.appform.dropwizard.actors.base.utils;

import io.appform.dropwizard.actors.actor.ActorConfig;
import io.appform.dropwizard.actors.utils.CommonUtils;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.RandomUtils;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class NamingUtils {
Expand All @@ -26,8 +28,21 @@ public static String prefixWithNamespace(String name) {
return String.format("%s.%s", namespace, name);
}

public static String getRoutingKey(String queueName, ActorConfig config) {
String routingKey = queueName;
if (config.isSharded()) {
int shardId = getShardId(config);
routingKey = getShardedQueueName(queueName, shardId);
}
return routingKey;
}

public static String getShardedQueueName(String queueName, int shardId) {
return queueName + "_" + shardId;
}

private static int getShardId(ActorConfig config) {
return RandomUtils.nextInt(0, config.getShardCount());
}

}
Loading