diff --git a/pom.xml b/pom.xml index 9b06be70..be311da7 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ io.appform.dropwizard.actors dropwizard-rabbitmq-actors - 2.0.28-1 + 3.0.2_Test Dropwizard RabbitMQ Bundle https://github.com/santanusinha/dropwizard-rabbitmq-actors Provides actor abstraction on RabbitMQ for dropwizard based projects. @@ -92,6 +92,7 @@ 31.0.1-jre 1.0.6 4.9.3 + 4.4.0 5.14.1 @@ -112,6 +113,12 @@ ${lombok.version} provided + + org.mockito + mockito-core + ${mockito.version} + test + io.dropwizard dropwizard-core @@ -186,7 +193,7 @@ junit - junit + junit-dep @@ -200,6 +207,11 @@ junit-vintage-engine 5.8.1 + + junit + junit + 4.13.2 + diff --git a/src/main/java/io/appform/dropwizard/actors/actor/UnmanagedBaseActor.java b/src/main/java/io/appform/dropwizard/actors/actor/UnmanagedBaseActor.java index 345ea873..d29d86a6 100644 --- a/src/main/java/io/appform/dropwizard/actors/actor/UnmanagedBaseActor.java +++ b/src/main/java/io/appform/dropwizard/actors/actor/UnmanagedBaseActor.java @@ -29,6 +29,9 @@ import io.appform.dropwizard.actors.connectivity.strategy.SharedConnectionStrategy; import io.appform.dropwizard.actors.exceptionhandler.ExceptionHandlingFactory; import io.appform.dropwizard.actors.retry.RetryStrategyFactory; +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.validation.constraints.NotNull; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; @@ -124,6 +127,11 @@ public final void publish(Message message, AMQP.BasicProperties properties) thro publishActor().publish(message, properties); } + public final List publishWithConfirmListener(List messages, AMQP.BasicProperties properties, + long timeout, @NotNull TimeUnit unit) throws Exception { + return publishActor().publishWithConfirmListener(messages, properties, timeout, unit); + } + public final long pendingMessagesCount() { return publishActor().pendingMessagesCount(); } diff --git a/src/main/java/io/appform/dropwizard/actors/base/UnmanagedPublisher.java b/src/main/java/io/appform/dropwizard/actors/base/UnmanagedPublisher.java index 5e3e3864..80ea0344 100644 --- a/src/main/java/io/appform/dropwizard/actors/base/UnmanagedPublisher.java +++ b/src/main/java/io/appform/dropwizard/actors/base/UnmanagedPublisher.java @@ -9,6 +9,14 @@ import io.appform.dropwizard.actors.actor.DelayType; import io.appform.dropwizard.actors.base.utils.NamingUtils; import io.appform.dropwizard.actors.connectivity.RMQConnection; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.validation.constraints.NotNull; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomUtils; @@ -106,6 +114,93 @@ public final long pendingSidelineMessagesCount() { return Long.MAX_VALUE; } + /** + * @param messages : Messages to be published + * @param properties + * @param timeout : in MS timeout for waiting on countDownLatch + * @param unit : timeout unit + * @return : List of message nacked + * @throws Exception + */ + public List publishWithConfirmListener(List messages, AMQP.BasicProperties properties, + long timeout, @NotNull TimeUnit unit) throws Exception { + publishChannel.confirmSelect(); + ConcurrentNavigableMap outstandingConfirms = new ConcurrentSkipListMap<>(); + List nackedMessages = new ArrayList<>(); + CountDownLatch publishAckLatch = new CountDownLatch(messages.size()); + + publishChannel.addConfirmListener((sequenceNumber, multiple) -> { + messagesAck(sequenceNumber, multiple, outstandingConfirms, publishAckLatch); + }, (sequenceNumber, multiple) -> { + nackedMessages.addAll(messagesNack(sequenceNumber, multiple, outstandingConfirms, publishAckLatch)); + }); + + + long startTime = System.nanoTime(); + + for (Message message : messages) { + try { + String routingKey = NamingUtils.getRoutingKey(queueName, config); + outstandingConfirms.put(publishChannel.getNextPublishSeqNo(), message); + publishChannel.basicPublish(config.getExchange(), routingKey, properties, + mapper().writeValueAsBytes(message)); + } catch (Exception e) { + log.error(String.format("Failed to publish Message : %s with exception %s", message, e)); + publishAckLatch.countDown(); + } + } + + if (!publishAckLatch.await(timeout, unit)) { + log.error("Timed out waiting for publish acks"); + } + + long endTime = System.nanoTime(); + + log.info(String.format("Published %d messages with confirmListener in %d ms. Total Messages : %d", messages.size() - outstandingConfirms.size(), + Duration.ofNanos(startTime - endTime).toMillis(), messages.size())); + nackedMessages.addAll(outstandingConfirms.values()); + return nackedMessages; + } + + + private void messagesAck(long sequenceNumber, boolean multiple, ConcurrentNavigableMap outstandingConfirms, CountDownLatch publishAckLatch) + { + if (multiple) { + ConcurrentNavigableMap confirmed = outstandingConfirms.headMap( + sequenceNumber, true + ); + for(int i =0;i messagesNack(long sequenceNumber, boolean multiple, ConcurrentNavigableMap outstandingConfirms, CountDownLatch publishAckLatch) + { + List nackedMessages = new ArrayList<>(); + if(multiple == true) + { + ConcurrentNavigableMap nacked = outstandingConfirms.headMap( + sequenceNumber, true + ); + for(int i =0;i messagePublisher; + + private ObjectMapper objectMapper = new ObjectMapper(); + + private RMQConnection rmqConnection = mock(RMQConnection.class); + + @Test + public void testConstructor() { + ActorConfig config = new ActorConfig(); + RMQConfig config1 = new RMQConfig(); + DirectExecutorService executorService = new DirectExecutorService(); + Environment environment = new Environment("Name"); + RMQConnection rmqConnection = new RMQConnection("Name", config1, executorService, environment, new TtlConfig()); + + com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper(); + UnmanagedPublisher actualUnmanagedPublisher = new UnmanagedPublisher<>("Name", config, rmqConnection, + objectMapper); + + assertSame(rmqConnection, actualUnmanagedPublisher.connection()); + assertSame(objectMapper, actualUnmanagedPublisher.mapper()); + } + + @Test + public void testPublishWithConfirmListener() throws Exception { + // Arrange + String message = "test message"; + List messages = Collections.singletonList(message); + AMQP.BasicProperties properties = MessageProperties.PERSISTENT_TEXT_PLAIN; + long timeout = 1; + TimeUnit unit = TimeUnit.SECONDS; + + // Mock confirm listener + ConfirmCallback ackConfirmListener = mock(ConfirmCallback.class); + doAnswer(invocation -> { + long sequenceNumber = invocation.getArgument(0); + boolean multiple = invocation.getArgument(1); + if (multiple) { + confirmMultiple(sequenceNumber); + } else { + confirmSingle(sequenceNumber); + } + return null; + }).when(ackConfirmListener).handle(anyLong(), anyBoolean()); + + ConfirmCallback nackConfirmListener = mock(ConfirmCallback.class); + doAnswer(invocation -> { + long sequenceNumber = invocation.getArgument(0); + boolean multiple = invocation.getArgument(1); + nack(sequenceNumber, multiple); + return null; + }).when(nackConfirmListener).handle(anyLong(), anyBoolean()); + + // Set up confirm select and confirm listener on publish channel + when(publishChannel.getNextPublishSeqNo()).thenReturn(1L, 2L); + when(publishChannel.isOpen()).thenReturn(true); + when(rmqConnection.newChannel()).thenReturn(publishChannel); + doAnswer(invocation -> { + ConfirmListener listener = invocation.getArgument(0); + listener.handleAck(1, false); + return null; + }).when(publishChannel).addConfirmListener(ackConfirmListener, nackConfirmListener); + + // Act + ActorConfig actorConfig = new ActorConfig(); + actorConfig.setExchange("test-exchange-1"); + messagePublisher = new UnmanagedPublisher<>("Name", actorConfig, rmqConnection, new ObjectMapper()); + assertSame(rmqConnection, messagePublisher.connection()); + messagePublisher.setPublishChannel(publishChannel); + List nackedMessages = messagePublisher.publishWithConfirmListener(messages, properties, timeout, unit); + verify(publishChannel, times(1)).getNextPublishSeqNo(); + publishChannel.basicPublish(eq(""), eq(""), same(properties), any()); + verify(publishChannel).confirmSelect(); + verify(publishChannel, times(1)).getNextPublishSeqNo(); + verify(publishChannel, times(1)).basicPublish(anyString(), anyString(), eq(properties), any(byte[].class)); + assertEquals(1, nackedMessages.size()); + } + + + @Test + public void testMultiplePublishWithConfirmListener() throws Exception { + // Arrange + String message = "test message"; + int numberOfMessage = 2; + List messages = new ArrayList<>(); + for(int i=0;i { + long sequenceNumber = invocation.getArgument(0); + boolean multiple = invocation.getArgument(1); + if (multiple) { + confirmMultiple(sequenceNumber); + } else { + confirmSingle(sequenceNumber); + } + return null; + }).when(ackConfirmListener).handle(anyLong(), anyBoolean()); + + ConfirmCallback nackConfirmListener = mock(ConfirmCallback.class); + doAnswer(invocation -> { + long sequenceNumber = invocation.getArgument(0); + boolean multiple = invocation.getArgument(1); + nack(sequenceNumber, multiple); + return null; + }).when(nackConfirmListener).handle(anyLong(), anyBoolean()); + + // Set up confirm select and confirm listener on publish channel + when(publishChannel.getNextPublishSeqNo()).thenReturn(1L, 2L); + when(publishChannel.isOpen()).thenReturn(true); + when(rmqConnection.newChannel()).thenReturn(publishChannel); + doAnswer(invocation -> { + ConfirmListener listener = invocation.getArgument(0); + listener.handleAck(1, false); + return null; + }).when(publishChannel).addConfirmListener(ackConfirmListener, nackConfirmListener); + + // Act + ActorConfig actorConfig = new ActorConfig(); + actorConfig.setExchange("test-exchange-1"); + messagePublisher = new UnmanagedPublisher<>("Name", actorConfig, rmqConnection, new ObjectMapper()); + assertSame(rmqConnection, messagePublisher.connection()); + messagePublisher.setPublishChannel(publishChannel); + List nackedMessages = messagePublisher.publishWithConfirmListener(messages, properties, timeout, unit); + verify(publishChannel, times(numberOfMessage)).getNextPublishSeqNo(); + publishChannel.basicPublish(eq(""), eq(""), same(properties), any()); + verify(publishChannel).confirmSelect(); + verify(publishChannel, times(numberOfMessage)).getNextPublishSeqNo(); + verify(publishChannel, times(numberOfMessage)).basicPublish(anyString(), anyString(), eq(properties), any(byte[].class)); + assertEquals(numberOfMessage, nackedMessages.size()); + } + + + @Test + public void testConnection() { + ActorConfig config = new ActorConfig(); + RMQConfig config1 = new RMQConfig(); + DirectExecutorService executorService = new DirectExecutorService(); + Environment environment = new Environment("Name"); + RMQConnection rmqConnection = new RMQConnection("Name", config1, executorService, environment, new TtlConfig()); + + assertSame(rmqConnection, + (new UnmanagedPublisher<>("Name", config, rmqConnection, + new com.fasterxml.jackson.databind.ObjectMapper())) + .connection()); + } + + + @Test + public void testMapper() { + ActorConfig config = new ActorConfig(); + RMQConfig config1 = new RMQConfig(); + DirectExecutorService executorService = new DirectExecutorService(); + Environment environment = new Environment("Name"); + RMQConnection connection = new RMQConnection("Name", config1, executorService, environment, new TtlConfig()); + + com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper(); + assertSame(objectMapper, (new UnmanagedPublisher<>("Name", config, connection, objectMapper)).mapper()); + } + + private void confirmSingle(long sequenceNumber) { + // Do nothing + } + + private void confirmMultiple(long sequenceNumber) { + // Do nothing + } + + private void nack(long sequenceNumber, boolean multiple) { + // Do nothing + } +} diff --git a/src/test/java/io/appform/dropwizard/actors/connectivity/actor/NamespacedQueuesTest.java b/src/test/java/io/appform/dropwizard/actors/connectivity/actor/NamespacedQueuesTest.java index baabd59f..ea12c509 100644 --- a/src/test/java/io/appform/dropwizard/actors/connectivity/actor/NamespacedQueuesTest.java +++ b/src/test/java/io/appform/dropwizard/actors/connectivity/actor/NamespacedQueuesTest.java @@ -35,8 +35,8 @@ @Slf4j public class NamespacedQueuesTest { - private static final int RABBITMQ_MANAGEMENT_PORT = 15672; - private static final String RABBITMQ_DOCKER_IMAGE = "rabbitmq:3-management"; + private static final int RABBITMQ_MANAGEMENT_PORT = 5672; + private static final String RABBITMQ_DOCKER_IMAGE = "rabbitmq:3.8-alpine"; private static final String RABBITMQ_USERNAME = "guest"; private static final String RABBITMQ_PASSWORD = "guest"; private static final String NAMESPACE_ENV_NAME = "namespace1"; @@ -198,7 +198,7 @@ private static GenericContainer rabbitMQContainer() { private static RMQConfig getRMQConfig(GenericContainer rabbitmqContainer) { RMQConfig rmqConfig = new RMQConfig(); - Integer mappedPort = rabbitmqContainer.getMappedPort(5672); + Integer mappedPort = rabbitmqContainer.getMappedPort(RABBITMQ_MANAGEMENT_PORT); String host = rabbitmqContainer.getContainerIpAddress(); List brokers = new ArrayList(); brokers.add(new Broker(host, mappedPort));