From 5dbd7d8c7c1f8517c3888dfdf2f1de8661d8eda8 Mon Sep 17 00:00:00 2001 From: rwalerow Date: Tue, 19 Feb 2019 14:07:22 +0100 Subject: [PATCH 01/14] Add inflight messages limit queue data param --- core/src/main/scala/org/elasticmq/QueueData.scala | 3 ++- .../main/scala/org/elasticmq/server/ElasticMQServer.scala | 5 +++-- .../main/scala/org/elasticmq/server/config/CreateQueue.scala | 3 ++- .../org/elasticmq/server/config/ElasticMQServerConfig.scala | 3 ++- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/elasticmq/QueueData.scala b/core/src/main/scala/org/elasticmq/QueueData.scala index 2ba74c90c..b2b9b0a03 100644 --- a/core/src/main/scala/org/elasticmq/QueueData.scala +++ b/core/src/main/scala/org/elasticmq/QueueData.scala @@ -13,6 +13,7 @@ case class QueueData(name: String, hasContentBasedDeduplication: Boolean = false, copyMessagesTo: Option[String] = None, moveMessagesTo: Option[String] = None, - tags: Map[String, String] = Map[String, String]()) + tags: Map[String, String] = Map[String, String](), + inflightMessagesLimit: Option[Int] = None) case class DeadLettersQueueData(name: String, maxReceiveCount: Int) diff --git a/server/src/main/scala/org/elasticmq/server/ElasticMQServer.scala b/server/src/main/scala/org/elasticmq/server/ElasticMQServer.scala index a256aec60..e57a17993 100644 --- a/server/src/main/scala/org/elasticmq/server/ElasticMQServer.scala +++ b/server/src/main/scala/org/elasticmq/server/ElasticMQServer.scala @@ -55,6 +55,7 @@ class ElasticMQServer(config: ElasticMQServerConfig) extends Logging { } else { None } + } private def createQueues(queueManagerActor: ActorRef): Unit = { @@ -85,8 +86,8 @@ class ElasticMQServer(config: ElasticMQServerConfig) extends Logging { hasContentBasedDeduplication = cq.hasContentBasedDeduplication, copyMessagesTo = cq.copyMessagesTo, moveMessagesTo = cq.moveMessagesTo, - tags = cq.tags + tags = cq.tags, + inflightMessagesLimit = cq.inflightMessagesLimit ) } - } diff --git a/server/src/main/scala/org/elasticmq/server/config/CreateQueue.scala b/server/src/main/scala/org/elasticmq/server/config/CreateQueue.scala index 78fd3e014..387e47ff3 100644 --- a/server/src/main/scala/org/elasticmq/server/config/CreateQueue.scala +++ b/server/src/main/scala/org/elasticmq/server/config/CreateQueue.scala @@ -9,4 +9,5 @@ case class CreateQueue(name: String, hasContentBasedDeduplication: Boolean, copyMessagesTo: Option[String] = None, moveMessagesTo: Option[String] = None, - tags: Map[String, String] = Map[String, String]()) + tags: Map[String, String] = Map[String, String](), + inflightMessagesLimit: Option[Int] = None) diff --git a/server/src/main/scala/org/elasticmq/server/config/ElasticMQServerConfig.scala b/server/src/main/scala/org/elasticmq/server/config/ElasticMQServerConfig.scala index 6f8588e2c..b919f5107 100644 --- a/server/src/main/scala/org/elasticmq/server/config/ElasticMQServerConfig.scala +++ b/server/src/main/scala/org/elasticmq/server/config/ElasticMQServerConfig.scala @@ -94,7 +94,8 @@ class ElasticMQServerConfig(config: Config) extends Logging { hasContentBasedDeduplication = getOptionalBoolean(c, "contentBasedDeduplication").getOrElse(false), copyMessagesTo = getOptionalString(c, "copyTo"), moveMessagesTo = getOptionalString(c, "moveTo"), - tags = getOptionalTags(c, "tags") + tags = getOptionalTags(c, "tags"), + inflightMessagesLimit = None ) } .toList From 3887666870d21c493ec3d756bf88ac37414af0b0 Mon Sep 17 00:00:00 2001 From: rwalerow Date: Tue, 19 Feb 2019 14:08:17 +0100 Subject: [PATCH 02/14] Add test for inflight messages limit --- .../rest/sqs/AmazonJavaSdkTestSuite.scala | 69 ++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala index 03a067c2e..e41900bfd 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala @@ -3,17 +3,23 @@ package org.elasticmq.rest.sqs import java.nio.ByteBuffer import java.util.UUID +import akka.actor.{ActorRef, ActorSystem, Props} +import akka.util.Timeout import com.amazonaws.AmazonServiceException import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials} import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration import com.amazonaws.services.sqs.model._ import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder} import org.elasticmq._ +import org.elasticmq.actor.QueueManagerActor +import org.elasticmq.msg.LookupQueue import org.elasticmq.rest.sqs.model.RedrivePolicy -import org.elasticmq.util.Logging +import org.elasticmq.util.{Logging, NowProvider} +import org.joda.time.{DateTime, Duration} import org.scalatest.{Matchers, _} import scala.collection.JavaConverters._ +import scala.concurrent.Await import scala.util.Try import scala.util.control.Exception._ @@ -26,15 +32,23 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter var client: AmazonSQS = _ // strict server var relaxedClient: AmazonSQS = _ + var withQueueClient: AmazonSQS = _ // strict server var currentTestName: String = _ var strictServer: SQSRestServer = _ var relaxedServer: SQSRestServer = _ + var withQueueServer: SQSRestServer = _ + + var actorSystem: ActorSystem = _ + var queueManagerActor: ActorRef = _ before { logger.info(s"\n---\nRunning test: $currentTestName\n---\n") + actorSystem = ActorSystem("AmazonJavaSdkTestSuite") + queueManagerActor = createQueueManager(actorSystem) + strictServer = SQSRestServerBuilder .withPort(9321) .withServerAddress(NodeAddress(port = 9321)) @@ -46,6 +60,12 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter .withSQSLimits(SQSLimits.Relaxed) .start() + withQueueServer = SQSRestServerBuilder + .withPort(9323) + .withServerAddress(NodeAddress(port = 9323)) + .withQueueManagerActor(queueManagerActor) + .start() + strictServer.waitUntilStarted() relaxedServer.waitUntilStarted() @@ -60,6 +80,12 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"))) .withEndpointConfiguration(new EndpointConfiguration("http://localhost:9322", "us-east-1")) .build() + + withQueueClient = AmazonSQSClientBuilder + .standard() + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"))) + .withEndpointConfiguration(new EndpointConfiguration("http://localhost:9323", "us-east-1")) + .build() } after { @@ -1696,6 +1722,33 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter } } + test("should hit inflight messages limit") { + import org.elasticmq.actor.reply._ + implicit val timeout = { + import scala.concurrent.duration._ + Timeout(5.seconds) + } + + // crate queue with limit + val createQueue = defaultCreateQueueData().copy(name = "test-queue",inflightMessagesLimit = Some(6)) + val f = queueManagerActor ? org.elasticmq.msg.CreateQueue(createQueue) + Await.result(f, timeout.duration) + + val testQueueUrl = "http://localhost:9323/queue/test-queue" + + (1 to 10).foreach{ i => + withQueueClient.sendMessage(testQueueUrl, s"Message $i") + } + + (1 to 5).foreach{ i => + val _ = withQueueClient.receiveMessage(testQueueUrl) + } + + assertThrows[OverLimitException]{ + withQueueClient.receiveMessage(testQueueUrl).getSdkResponseMetadata() + } + } + def queueDelay(queueUrl: String): Long = getQueueLongAttribute(queueUrl, delaySecondsAttribute) def getQueueLongAttribute(queueUrl: String, attributeName: String): Long = { @@ -1762,4 +1815,18 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter val createRequest2 = attributes.foldLeft(createRequest1) { case (acc, (k, v)) => acc.addAttributesEntry(k, v) } client.createQueue(createRequest2).getQueueUrl } + + private def defaultCreateQueueData(): QueueData = { + QueueData( + name = "test-queue", + defaultVisibilityTimeout = MillisVisibilityTimeout(1000 * 60), + delay = Duration.standardSeconds(30L), + receiveMessageWait = Duration.standardSeconds(30L), + created = DateTime.now(), + lastModified = DateTime.now()) + } + + private def createQueueManager(actorSystem: ActorSystem): ActorRef = { + actorSystem.actorOf(Props(new QueueManagerActor(new NowProvider()))) + } } From a5edd20816dc3864a5125a66ed8361f5e42e3ba3 Mon Sep 17 00:00:00 2001 From: rwalerow Date: Thu, 21 Feb 2019 12:29:55 +0100 Subject: [PATCH 03/14] Fix AmazonJavaSdkTestSuite for multiple runs --- .../rest/sqs/AmazonJavaSdkTestSuite.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala index e41900bfd..9d29ca85b 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala @@ -91,10 +91,12 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter after { client.shutdown() relaxedClient.shutdown() + withQueueClient.shutdown() // TODO: Figure out why this intermittently isn't able to unbind cleanly Try(strictServer.stopAndWait()) Try(relaxedServer.stopAndWait()) + Try(withQueueServer.stopAndWait()) logger.info(s"\n---\nTest done: $currentTestName\n---\n") } @@ -1730,21 +1732,21 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter } // crate queue with limit - val createQueue = defaultCreateQueueData().copy(name = "test-queue",inflightMessagesLimit = Some(6)) + val createQueue = defaultCreateQueueData().copy(name = "test-queue", inflightMessagesLimit = 5) val f = queueManagerActor ? org.elasticmq.msg.CreateQueue(createQueue) Await.result(f, timeout.duration) - + val testQueueUrl = "http://localhost:9323/queue/test-queue" - (1 to 10).foreach{ i => + (1 to 10).foreach { i => withQueueClient.sendMessage(testQueueUrl, s"Message $i") } - (1 to 5).foreach{ i => + (1 to 5).foreach { _ => val _ = withQueueClient.receiveMessage(testQueueUrl) } - assertThrows[OverLimitException]{ + assertThrows[OverLimitException] { withQueueClient.receiveMessage(testQueueUrl).getSdkResponseMetadata() } } @@ -1820,10 +1822,12 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter QueueData( name = "test-queue", defaultVisibilityTimeout = MillisVisibilityTimeout(1000 * 60), - delay = Duration.standardSeconds(30L), - receiveMessageWait = Duration.standardSeconds(30L), + delay = Duration.standardSeconds(1L), + receiveMessageWait = Duration.standardSeconds(1L), created = DateTime.now(), - lastModified = DateTime.now()) + lastModified = DateTime.now(), + inflightMessagesLimit = 1000 + ) } private def createQueueManager(actorSystem: ActorSystem): ActorRef = { From 58377de37eb034732fc4aa32c4718866721782c1 Mon Sep 17 00:00:00 2001 From: rwalerow Date: Thu, 21 Feb 2019 12:35:25 +0100 Subject: [PATCH 04/14] InflightMessagesLimit is required for queue data --- .../main/scala/org/elasticmq/QueueData.scala | 4 ++-- .../actor/QueueActorQueueOpsTest.scala | 17 +++++++++++++++-- .../actor/test/DataCreationHelpers.scala | 4 +++- .../rest/sqs/CreateQueueDirectives.scala | 3 +++ .../org/elasticmq/server/ElasticMQServer.scala | 4 +++- 5 files changed, 26 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/elasticmq/QueueData.scala b/core/src/main/scala/org/elasticmq/QueueData.scala index b2b9b0a03..ab9167b1b 100644 --- a/core/src/main/scala/org/elasticmq/QueueData.scala +++ b/core/src/main/scala/org/elasticmq/QueueData.scala @@ -8,12 +8,12 @@ case class QueueData(name: String, receiveMessageWait: Duration, created: DateTime, lastModified: DateTime, + inflightMessagesLimit: Int, deadLettersQueue: Option[DeadLettersQueueData] = None, isFifo: Boolean = false, hasContentBasedDeduplication: Boolean = false, copyMessagesTo: Option[String] = None, moveMessagesTo: Option[String] = None, - tags: Map[String, String] = Map[String, String](), - inflightMessagesLimit: Option[Int] = None) + tags: Map[String, String] = Map[String, String]()) case class DeadLettersQueueData(name: String, maxReceiveCount: Int) diff --git a/core/src/test/scala/org/elasticmq/actor/QueueActorQueueOpsTest.scala b/core/src/test/scala/org/elasticmq/actor/QueueActorQueueOpsTest.scala index 7e2af48c8..80c2efc22 100644 --- a/core/src/test/scala/org/elasticmq/actor/QueueActorQueueOpsTest.scala +++ b/core/src/test/scala/org/elasticmq/actor/QueueActorQueueOpsTest.scala @@ -12,17 +12,30 @@ class QueueActorQueueOpsTest extends ActorTest with QueueManagerForEachTest with // Given val created = new DateTime(1216168602L) val lastModified = new DateTime(1316168602L) + val inflightMessagesLimit = 100 for { Right(queueActor) <- queueManagerActor ? CreateQueue( - QueueData("q1", MillisVisibilityTimeout(1L), Duration.ZERO, Duration.ZERO, created, lastModified)) + QueueData("q1", + MillisVisibilityTimeout(1L), + Duration.ZERO, + Duration.ZERO, + created, + lastModified, + inflightMessagesLimit)) // When queueData <- queueActor ? GetQueueData() } yield { // Then queueData should be( - QueueData("q1", MillisVisibilityTimeout(1L), Duration.ZERO, Duration.ZERO, created, lastModified)) + QueueData("q1", + MillisVisibilityTimeout(1L), + Duration.ZERO, + Duration.ZERO, + created, + lastModified, + inflightMessagesLimit)) } } diff --git a/core/src/test/scala/org/elasticmq/actor/test/DataCreationHelpers.scala b/core/src/test/scala/org/elasticmq/actor/test/DataCreationHelpers.scala index b84141058..731bbdedc 100644 --- a/core/src/test/scala/org/elasticmq/actor/test/DataCreationHelpers.scala +++ b/core/src/test/scala/org/elasticmq/actor/test/DataCreationHelpers.scala @@ -4,6 +4,7 @@ import org.elasticmq._ import org.joda.time.{DateTime, Duration} import org.elasticmq.MessageId import org.elasticmq.MillisNextDelivery +import org.elasticmq.actor.queue.QueueActorDefaults trait DataCreationHelpers { def createQueueData( @@ -24,7 +25,8 @@ trait DataCreationHelpers { deadLettersQueue = deadLettersQueue, copyMessagesTo = copyMessagesToQueue, moveMessagesTo = moveMessagesToQueue, - tags = tags + tags = tags, + inflightMessagesLimit = QueueActorDefaults.defaultInflightMessagesLimit(false) ) def createMessageData(id: String, diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/CreateQueueDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/CreateQueueDirectives.scala index 33a791202..9efc90bac 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/CreateQueueDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/CreateQueueDirectives.scala @@ -1,5 +1,6 @@ package org.elasticmq.rest.sqs +import org.elasticmq.actor.queue.QueueActorDefaults import org.elasticmq.actor.reply._ import org.elasticmq.msg.{CreateQueue, GetQueueData, LookupQueue} import org.elasticmq.rest.sqs.Constants._ @@ -68,6 +69,7 @@ trait CreateQueueDirectives { val now = new DateTime() val isFifo = attributes.get("FifoQueue").contains("true") val hasContentBasedDeduplication = attributes.get("ContentBasedDeduplication").contains("true") + val inflightMessagesLimit = QueueActorDefaults.defaultInflightMessagesLimit(isFifo) val newQueueData = QueueData( queueName, MillisVisibilityTimeout.fromSeconds(secondsVisibilityTimeout), @@ -75,6 +77,7 @@ trait CreateQueueDirectives { Duration.standardSeconds(secondsReceiveMessageWaitTime), now, now, + inflightMessagesLimit, redrivePolicy.map(rd => DeadLettersQueueData(rd.queueName, rd.maxReceiveCount)), isFifo, hasContentBasedDeduplication diff --git a/server/src/main/scala/org/elasticmq/server/ElasticMQServer.scala b/server/src/main/scala/org/elasticmq/server/ElasticMQServer.scala index e57a17993..39494e916 100644 --- a/server/src/main/scala/org/elasticmq/server/ElasticMQServer.scala +++ b/server/src/main/scala/org/elasticmq/server/ElasticMQServer.scala @@ -3,6 +3,7 @@ package org.elasticmq.server import akka.actor.{ActorRef, ActorSystem, Props} import akka.util.Timeout import org.elasticmq.actor.QueueManagerActor +import org.elasticmq.actor.queue.QueueActorDefaults import org.elasticmq.actor.reply._ import org.elasticmq.rest.sqs.{CreateQueueDirectives, SQSRestServer, TheSQSRestServerBuilder} import org.elasticmq.server.config.{CreateQueue, ElasticMQServerConfig} @@ -87,7 +88,8 @@ class ElasticMQServer(config: ElasticMQServerConfig) extends Logging { copyMessagesTo = cq.copyMessagesTo, moveMessagesTo = cq.moveMessagesTo, tags = cq.tags, - inflightMessagesLimit = cq.inflightMessagesLimit + inflightMessagesLimit = + cq.inflightMessagesLimit.getOrElse(QueueActorDefaults.defaultInflightMessagesLimit(cq.isFifo)) ) } } From cf0b2e621f616081f880698ddbf97e6b9f496304 Mon Sep 17 00:00:00 2001 From: rwalerow Date: Thu, 21 Feb 2019 12:36:23 +0100 Subject: [PATCH 05/14] Handle inflight messages when receive and delete message --- .../scala/org/elasticmq/ElasticMQError.scala | 5 ++ .../elasticmq/actor/queue/QueueActor.scala | 10 +++ .../actor/queue/QueueActorMessageOps.scala | 15 +++- .../actor/queue/QueueActorStorage.scala | 3 + .../queue/QueueActorWaitForMessagesOps.scala | 18 +++-- .../scala/org/elasticmq/msg/QueueMsg.scala | 2 +- .../actor/QueueActorMsgOpsTest.scala | 74 +++++++++++-------- .../rest/sqs/ReceiveMessageDirectives.scala | 13 +++- 8 files changed, 93 insertions(+), 47 deletions(-) diff --git a/core/src/main/scala/org/elasticmq/ElasticMQError.scala b/core/src/main/scala/org/elasticmq/ElasticMQError.scala index 265c29fe2..225d53740 100644 --- a/core/src/main/scala/org/elasticmq/ElasticMQError.scala +++ b/core/src/main/scala/org/elasticmq/ElasticMQError.scala @@ -15,3 +15,8 @@ class MessageDoesNotExist(val queueName: String, messageId: MessageId) extends E val code = "MessageDoesNotExist" val message = s"Message does not exist: $messageId in queue: $queueName" } + +class OverLimitLimitError(val queueName: String) extends ElasticMQError { + val code = "OverLimit" + val message = s"Inflight message limit exceeded for queue: $queueName" +} diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActor.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActor.scala index 32f16579b..d48861cb4 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActor.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActor.scala @@ -27,3 +27,13 @@ class QueueActor(val nowProvider: NowProvider, case m: QueueMessageMsg[T] => receiveAndReplyMessageMsg(m) } } + +object QueueActorDefaults { + + private val defaultInflightLimitFifo = 12000 + private val defaultInflightLimitStandard = 120000 + + def defaultInflightMessagesLimit(isFifo: Boolean): Int = + if (isFifo) defaultInflightLimitFifo + else defaultInflightLimitStandard +} diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala index d935a8800..01fed4e2f 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala @@ -1,6 +1,7 @@ package org.elasticmq.actor.queue import akka.actor.ActorRef +import org.elasticmq.OverLimitLimitError import org.elasticmq.actor.reply._ import org.elasticmq.msg.{DeleteMessage, LookupMessage, ReceiveMessages, SendMessage, UpdateVisibilityTimeout, _} import org.elasticmq.util.{Logging, NowProvider} @@ -17,7 +18,10 @@ trait QueueActorMessageOps extends Logging { case SendMessage(message) => handleOrRedirectMessage(message) case UpdateVisibilityTimeout(messageId, visibilityTimeout) => updateVisibilityTimeout(messageId, visibilityTimeout) case ReceiveMessages(visibilityTimeout, count, _, receiveRequestAttemptId) => - receiveMessages(visibilityTimeout, count, receiveRequestAttemptId) + if (inflightMessagesRegisty.size >= queueData.inflightMessagesLimit) + Left(new OverLimitLimitError(queueData.name)) + else + receiveMessages(visibilityTimeout, count, receiveRequestAttemptId) case DeleteMessage(deliveryReceipt) => deleteMessage(deliveryReceipt) case LookupMessage(messageId) => messageQueue.byId.get(messageId.id).map(_.toMessageData) } @@ -105,7 +109,7 @@ trait QueueActorMessageOps extends Logging { protected def receiveMessages(visibilityTimeout: VisibilityTimeout, count: Int, - receiveRequestAttemptId: Option[String]): List[MessageData] = { + receiveRequestAttemptId: Option[String]): Either[ElasticMQError, List[MessageData]] = { implicit val np = nowProvider val messages = receiveRequestAttemptId .flatMap({ attemptId => @@ -134,7 +138,11 @@ trait QueueActorMessageOps extends Logging { receiveRequestAttemptCache.add(attemptId, messages) } - messages.map(_.toMessageData) + messages.foreach { message => + inflightMessagesRegisty += message.id + } + + Right(messages.map(_.toMessageData)) } private def getMessagesFromRequestAttemptCache(receiveRequestAttemptId: String)( @@ -177,6 +185,7 @@ trait QueueActorMessageOps extends Logging { messageQueue.byId.get(msgId).foreach { msgData => if (msgData.deliveryReceipts.lastOption.contains(deliveryReceipt.receipt)) { // Just removing the msg from the map. The msg will be removed from the queue when trying to receive it. + inflightMessagesRegisty -= msgId messageQueue.remove(msgId) } } diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorStorage.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorStorage.scala index 30f6eb76d..771315606 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorStorage.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorStorage.scala @@ -4,6 +4,8 @@ import akka.actor.ActorRef import org.elasticmq.QueueData import org.elasticmq.util.NowProvider +import scala.collection.mutable + trait QueueActorStorage { def nowProvider: NowProvider @@ -16,4 +18,5 @@ trait QueueActorStorage { var queueData: QueueData = initialQueueData var messageQueue = MessageQueue(queueData.isFifo) val receiveRequestAttemptCache = new ReceiveRequestAttemptCache + val inflightMessagesRegisty = new mutable.TreeSet[String]() } diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorWaitForMessagesOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorWaitForMessagesOps.scala index 171c39bf9..8f7efb111 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorWaitForMessagesOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorWaitForMessagesOps.scala @@ -42,7 +42,7 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO val result = super.receiveAndReplyMessageMsg(msg) val waitForMessages = waitForMessagesOpt.getOrElse(queueData.receiveMessageWait) - if (result == ReplyWith(Nil) && waitForMessages.getMillis > 0) { + if (result == ReplyWith(Right(Nil)) && waitForMessages.getMillis > 0) { val seq = assignSequenceFor(rm) logger.debug(s"${queueData.name}: Awaiting messages: start for sequence $seq.") scheduleTimeoutReply(seq, waitForMessages) @@ -67,13 +67,15 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO AwaitingData(originalSender, ReceiveMessages(visibilityTimeout, count, _, receiveRequestAttemptId), _))) => val received = super.receiveMessages(visibilityTimeout, count, receiveRequestAttemptId) - if (received != Nil) { - originalSender ! received - logger.debug( - s"${queueData.name}: Awaiting messages: replying to sequence $seq with ${received.size} messages.") - awaitingReply.remove(seq) + received match { + case Right(receivedList) if receivedList.nonEmpty => + originalSender ! received + logger.debug( + s"${queueData.name}: Awaiting messages: replying to sequence $seq with ${receivedList.size} messages.") + awaitingReply.remove(seq) - tryReply() + tryReply() + case _ => () } case _ => // do nothing } @@ -87,7 +89,7 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO } private def scheduleTimeoutReply(seq: Long, waitForMessages: Duration): Unit = { - schedule(waitForMessages.getMillis, ReplyIfTimeout(seq, Nil)) + schedule(waitForMessages.getMillis, ReplyIfTimeout(seq, Right(Nil))) } private def scheduleTryReplyWhenAvailable(): Unit = { diff --git a/core/src/main/scala/org/elasticmq/msg/QueueMsg.scala b/core/src/main/scala/org/elasticmq/msg/QueueMsg.scala index 3065560e8..864f07945 100644 --- a/core/src/main/scala/org/elasticmq/msg/QueueMsg.scala +++ b/core/src/main/scala/org/elasticmq/msg/QueueMsg.scala @@ -30,6 +30,6 @@ case class ReceiveMessages(visibilityTimeout: VisibilityTimeout, count: Int, waitForMessages: Option[Duration], receiveRequestAttemptId: Option[String]) - extends QueueMessageMsg[List[MessageData]] + extends QueueMessageMsg[Either[ElasticMQError, List[MessageData]]] case class DeleteMessage(deliveryReceipt: DeliveryReceipt) extends QueueMessageMsg[Unit] case class LookupMessage(messageId: MessageId) extends QueueMessageMsg[Option[MessageData]] diff --git a/core/src/test/scala/org/elasticmq/actor/QueueActorMsgOpsTest.scala b/core/src/test/scala/org/elasticmq/actor/QueueActorMsgOpsTest.scala index cbfc3e8f3..50af70605 100644 --- a/core/src/test/scala/org/elasticmq/actor/QueueActorMsgOpsTest.scala +++ b/core/src/test/scala/org/elasticmq/actor/QueueActorMsgOpsTest.scala @@ -72,7 +72,7 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D // When - lookupResult <- queueActor2 ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) + Right(lookupResult) <- queueActor2 ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) } yield { // Then lookupResult should be(Nil) @@ -91,7 +91,7 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D _ <- queueActor1 ? SendMessage(m) // When - lookupResult <- queueActor1 ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) + Right(lookupResult) <- queueActor1 ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) } yield { // Then withoutDeliveryReceipt(lookupResult.headOption) @@ -129,7 +129,7 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D // When lookupBeforeReceiving <- queueActor ? LookupMessage(MessageId("xyz")) - received <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) + Right(received) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) lookupAfterReceiving <- queueActor ? LookupMessage(MessageId("xyz")) } yield { // Then @@ -154,9 +154,9 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D _ <- queueActor ? SendMessage(createNewMessageData("xyz", "123", Map(), MillisNextDelivery(50L))) // When - received1 <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) + Right(received1) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) _ = nowProvider.mutableNowMillis.set(101L) - received2 <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) + Right(received2) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) } yield { // Then val received1Receipt = received1.flatMap(_.deliveryReceipt) @@ -178,7 +178,7 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D _ <- queueActor ? SendMessage(createNewMessageData("xyz", "123", Map(), MillisNextDelivery(123L))) // When - receiveResult <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) + Right(receiveResult) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) } yield { // Then receiveResult should be(Nil) @@ -220,7 +220,7 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D // When _ <- queueActor ? UpdateVisibilityTimeout(m2.id.get, MillisVisibilityTimeout(10L)) _ = nowProvider.mutableNowMillis.set(110L) - receiveResult <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) + Right(receiveResult) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) } yield { // Then // This should find the first msg, as it has the visibility timeout decreased. @@ -236,7 +236,7 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D for { Right(queueActor) <- queueManagerActor ? CreateQueue(q1) _ <- queueActor ? SendMessage(m1) - List(m1data) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) + Right(List(m1data)) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) // When _ <- queueActor ? DeleteMessage(m1data.deliveryReceipt.get) @@ -258,9 +258,9 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D // When Some(lookupResult) <- queueActor ? LookupMessage(m1.id.get) - List(receiveResult1) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) + Right(List(receiveResult1)) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) _ = nowProvider.mutableNowMillis.set(110L) - List(receiveResult2) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) + Right(List(receiveResult2)) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None) } yield { // Then lookupResult.statistics should be(MessageStatistics.empty) @@ -284,8 +284,8 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D _ <- queueActor ? SendMessage(m5) // When - receiveResults1 <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 3, None, None) - receiveResults2 <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 2, None, None) + Right(receiveResults1) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 3, None, None) + Right(receiveResults2) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 2, None, None) } yield { // Then receiveResults1.size should be(3) @@ -308,7 +308,7 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D _ <- queueActor ? SendMessage(m3) // When - receiveResults <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) + Right(receiveResults) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) } yield { // Then receiveResults.size should be(3) @@ -326,7 +326,10 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D Right(queueActor) <- queueManagerActor ? CreateQueue(q1) // When - receiveResults <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, Some(Duration.millis(500L)), None) + Right(receiveResults) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, + 5, + Some(Duration.millis(500L)), + None) } yield { // Then val end = System.currentTimeMillis() @@ -358,8 +361,8 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D receiveResults <- receiveResultsFuture } yield { // Then - receiveResults.size should be(1) - receiveResults.map(_.id) should be(msg.id.toList) + receiveResults.getOrElse(List.empty).size should be(1) + receiveResults.getOrElse(List.empty).map(_.id) should be(msg.id.toList) } } @@ -384,8 +387,8 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D _ <- { Thread.sleep(500); queueActor ? SendMessage(msg) } - receiveResults1 <- receiveResults1Future - receiveResults2 <- receiveResults2Future + Right(receiveResults1) <- receiveResults1Future + Right(receiveResults2) <- receiveResults2Future } yield { // Then val end = System.currentTimeMillis() @@ -426,9 +429,9 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D queueActor ? SendMessage(msg2) } - receiveResults1 <- receiveResults1Future - receiveResults2 <- receiveResults2Future - receiveResults3 <- receiveResults3Future + Right(receiveResults1) <- receiveResults1Future + Right(receiveResults2) <- receiveResults2Future + Right(receiveResults3) <- receiveResults3Future } yield { // Then List(receiveResults1.size, receiveResults2.size, receiveResults3.size).sum should be(2) @@ -455,10 +458,13 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D _ <- queueActor ? SendMessage(m1) // When - receiveResults <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) + Right(receiveResults) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) _ = nowProvider.mutableNowMillis.set(1000L) - receiveResultsEmpty <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) - receiveResultsDeadLettersQueue <- deadLettersQueueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) + Right(receiveResultsEmpty) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) + Right(receiveResultsDeadLettersQueue) <- deadLettersQueueActor ? ReceiveMessages(DefaultVisibilityTimeout, + 5, + None, + None) } yield { // Then receiveResults.size should be(1) @@ -490,10 +496,13 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D _ <- queueActor ? SendMessage(m1) // When - receiveResults <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) + Right(receiveResults) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) _ = nowProvider.mutableNowMillis.set(1000L) - receiveResultsEmpty <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) - receiveResultsDeadLettersQueue <- deadLettersQueueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) + Right(receiveResultsEmpty) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) + Right(receiveResultsDeadLettersQueue) <- deadLettersQueueActor ? ReceiveMessages(DefaultVisibilityTimeout, + 5, + None, + None) } yield { // Then receiveResults.size should be(1) @@ -519,8 +528,8 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D // When sendAck <- queueActor ? SendMessage(m1) _ = nowProvider.mutableNowMillis.set(1000L) - receiveResultsOriginal <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) - receiveResultsMoved <- redirectToActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) + Right(receiveResultsOriginal) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) + Right(receiveResultsMoved) <- redirectToActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) } yield { // Then sendAck.id should be(MessageId(m1ID)) @@ -543,8 +552,11 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D // When _ <- queueActor ? SendMessage(m1) _ = nowProvider.mutableNowMillis.set(1000L) - resultsOriginal <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) - resultsCopied <- copyToActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, Some(Duration.millis(500)), None) + Right(resultsOriginal) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None) + Right(resultsCopied) <- copyToActor ? ReceiveMessages(DefaultVisibilityTimeout, + 5, + Some(Duration.millis(500)), + None) } yield { // Then resultsOriginal.map(_.id) should be(List(MessageId(m1ID))) diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala index 5a53a7d31..fbbe80dc4 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala @@ -106,9 +106,10 @@ trait ReceiveMessageDirectives { } } - msgsFuture.map { msgs => - respondWith { - + msgsFuture.map { + case Right(msgs) => + respondWith { + {msgs.map { msg => val receipt = msg.deliveryReceipt.map(_.receipt).getOrElse(throw new RuntimeException("No receipt for a received msg.")) @@ -127,7 +128,11 @@ trait ReceiveMessageDirectives { {EmptyRequestId} - } + } + case Left(sqsError) => + respondWith(400) { + new SQSException(sqsError.code, 400, sqsError.code).toXml(EmptyRequestId) + } } } } From 3ee1da389d156635125262c305364b023b5ce879 Mon Sep 17 00:00:00 2001 From: rwalerow Date: Thu, 21 Feb 2019 12:37:28 +0100 Subject: [PATCH 06/14] Test scenario where user deletes messages, so no inflight error should occure --- .../rest/sqs/AmazonJavaSdkTestSuite.scala | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala index 9d29ca85b..c7f8914cb 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala @@ -1751,6 +1751,33 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter } } + test("should not encounter inflight limit exception when deleting messages") { + import org.elasticmq.actor.reply._ + implicit val timeout = { + import scala.concurrent.duration._ + Timeout(5.seconds) + } + + // crate queue with limit + val createQueue = defaultCreateQueueData().copy(name = "test-queue", inflightMessagesLimit = 5) + val f = queueManagerActor ? org.elasticmq.msg.CreateQueue(createQueue) + Await.result(f, timeout.duration) + + val testQueueUrl = "http://localhost:9323/queue/test-queue" + + (1 to 40).foreach { i => + withQueueClient.sendMessage(testQueueUrl, s"Message $i") + } + + (1 to 40).foreach { _ => + val received = withQueueClient.receiveMessage(testQueueUrl) + val messageId = received.getMessages.get(0).getReceiptHandle + val _ = withQueueClient.deleteMessage(testQueueUrl, messageId) + } + + () shouldBe (()) + } + def queueDelay(queueUrl: String): Long = getQueueLongAttribute(queueUrl, delaySecondsAttribute) def getQueueLongAttribute(queueUrl: String, attributeName: String): Long = { From e8fe92af3463a099975adb3f4bcfaa4b810b2f95 Mon Sep 17 00:00:00 2001 From: rwalerow Date: Thu, 21 Feb 2019 13:16:40 +0100 Subject: [PATCH 07/14] Adjust infligntMessages for fifoqueue accodring to sqs documentation --- core/src/main/scala/org/elasticmq/actor/queue/QueueActor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActor.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActor.scala index d48861cb4..343dcda3d 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActor.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActor.scala @@ -30,7 +30,7 @@ class QueueActor(val nowProvider: NowProvider, object QueueActorDefaults { - private val defaultInflightLimitFifo = 12000 + private val defaultInflightLimitFifo = 20000 private val defaultInflightLimitStandard = 120000 def defaultInflightMessagesLimit(isFifo: Boolean): Int = From 68a8ad2c45dd44dd31c7d6e62c291f985cd11f00 Mon Sep 17 00:00:00 2001 From: rwalerow Date: Thu, 21 Feb 2019 13:17:25 +0100 Subject: [PATCH 08/14] Move return error to receiveMessages method --- .../actor/queue/QueueActorMessageOps.scala | 65 ++++++++++--------- 1 file changed, 33 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala index 01fed4e2f..2beaf88ca 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala @@ -18,10 +18,7 @@ trait QueueActorMessageOps extends Logging { case SendMessage(message) => handleOrRedirectMessage(message) case UpdateVisibilityTimeout(messageId, visibilityTimeout) => updateVisibilityTimeout(messageId, visibilityTimeout) case ReceiveMessages(visibilityTimeout, count, _, receiveRequestAttemptId) => - if (inflightMessagesRegisty.size >= queueData.inflightMessagesLimit) - Left(new OverLimitLimitError(queueData.name)) - else - receiveMessages(visibilityTimeout, count, receiveRequestAttemptId) + receiveMessages(visibilityTimeout, count, receiveRequestAttemptId) case DeleteMessage(deliveryReceipt) => deleteMessage(deliveryReceipt) case LookupMessage(messageId) => messageQueue.byId.get(messageId.id).map(_.toMessageData) } @@ -110,39 +107,43 @@ trait QueueActorMessageOps extends Logging { protected def receiveMessages(visibilityTimeout: VisibilityTimeout, count: Int, receiveRequestAttemptId: Option[String]): Either[ElasticMQError, List[MessageData]] = { - implicit val np = nowProvider - val messages = receiveRequestAttemptId - .flatMap({ attemptId => - // for a given request id, check for any messages we've dequeued and cached - val cachedMessages = getMessagesFromRequestAttemptCache(attemptId) - - // if the cache returns an empty list instead of None, we still want to pull messages from - // from the queue so return None in that case to properly process down stream - cachedMessages.getOrElse(Nil) match { - case Nil => None - case default => Some(default) + if (inflightMessagesRegisty.size >= queueData.inflightMessagesLimit) + Left(new OverLimitLimitError(queueData.name)) + else { + implicit val np = nowProvider + val messages = receiveRequestAttemptId + .flatMap({ attemptId => + // for a given request id, check for any messages we've dequeued and cached + val cachedMessages = getMessagesFromRequestAttemptCache(attemptId) + + // if the cache returns an empty list instead of None, we still want to pull messages from + // from the queue so return None in that case to properly process down stream + cachedMessages.getOrElse(Nil) match { + case Nil => None + case default => Some(default) + } + }) + .getOrElse(getMessagesFromQueue(visibilityTimeout, count)) + .map { internalMessage => + // Putting the msg again into the queue, with a new next delivery + val newNextDelivery = computeNextDelivery(visibilityTimeout) + internalMessage.trackDelivery(newNextDelivery) + messageQueue += internalMessage + + logger.debug(s"${queueData.name}: Receiving message ${internalMessage.id}") + internalMessage } - }) - .getOrElse(getMessagesFromQueue(visibilityTimeout, count)) - .map { internalMessage => - // Putting the msg again into the queue, with a new next delivery - val newNextDelivery = computeNextDelivery(visibilityTimeout) - internalMessage.trackDelivery(newNextDelivery) - messageQueue += internalMessage - - logger.debug(s"${queueData.name}: Receiving message ${internalMessage.id}") - internalMessage + + receiveRequestAttemptId.foreach { attemptId => + receiveRequestAttemptCache.add(attemptId, messages) } - receiveRequestAttemptId.foreach { attemptId => - receiveRequestAttemptCache.add(attemptId, messages) - } + messages.foreach { message => + inflightMessagesRegisty += message.id + } - messages.foreach { message => - inflightMessagesRegisty += message.id + Right(messages.map(_.toMessageData)) } - - Right(messages.map(_.toMessageData)) } private def getMessagesFromRequestAttemptCache(receiveRequestAttemptId: String)( From 1b19ec0b5aa9b9925f61fe8f9272af9c3a370282 Mon Sep 17 00:00:00 2001 From: rwalerow Date: Thu, 21 Feb 2019 13:19:51 +0100 Subject: [PATCH 09/14] Rename OverLimitError --- core/src/main/scala/org/elasticmq/ElasticMQError.scala | 2 +- .../org/elasticmq/actor/queue/QueueActorMessageOps.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/elasticmq/ElasticMQError.scala b/core/src/main/scala/org/elasticmq/ElasticMQError.scala index 225d53740..8016552f1 100644 --- a/core/src/main/scala/org/elasticmq/ElasticMQError.scala +++ b/core/src/main/scala/org/elasticmq/ElasticMQError.scala @@ -16,7 +16,7 @@ class MessageDoesNotExist(val queueName: String, messageId: MessageId) extends E val message = s"Message does not exist: $messageId in queue: $queueName" } -class OverLimitLimitError(val queueName: String) extends ElasticMQError { +class OverLimitError(val queueName: String) extends ElasticMQError { val code = "OverLimit" val message = s"Inflight message limit exceeded for queue: $queueName" } diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala index 2beaf88ca..f5a276bc6 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala @@ -1,7 +1,7 @@ package org.elasticmq.actor.queue import akka.actor.ActorRef -import org.elasticmq.OverLimitLimitError +import org.elasticmq.OverLimitError import org.elasticmq.actor.reply._ import org.elasticmq.msg.{DeleteMessage, LookupMessage, ReceiveMessages, SendMessage, UpdateVisibilityTimeout, _} import org.elasticmq.util.{Logging, NowProvider} @@ -108,7 +108,7 @@ trait QueueActorMessageOps extends Logging { count: Int, receiveRequestAttemptId: Option[String]): Either[ElasticMQError, List[MessageData]] = { if (inflightMessagesRegisty.size >= queueData.inflightMessagesLimit) - Left(new OverLimitLimitError(queueData.name)) + Left(new OverLimitError(queueData.name)) else { implicit val np = nowProvider val messages = receiveRequestAttemptId From 79ca1106a23d676f0fbb1f01cedd77785bd2c129 Mon Sep 17 00:00:00 2001 From: rwalerow Date: Thu, 21 Feb 2019 13:58:25 +0100 Subject: [PATCH 10/14] fix invalid handling of receive response in QueueActorMsgOpsTest --- .../scala/org/elasticmq/actor/QueueActorMsgOpsTest.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/elasticmq/actor/QueueActorMsgOpsTest.scala b/core/src/test/scala/org/elasticmq/actor/QueueActorMsgOpsTest.scala index 50af70605..f2e1303fd 100644 --- a/core/src/test/scala/org/elasticmq/actor/QueueActorMsgOpsTest.scala +++ b/core/src/test/scala/org/elasticmq/actor/QueueActorMsgOpsTest.scala @@ -358,11 +358,11 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D queueActor ? SendMessage(msg) } - receiveResults <- receiveResultsFuture + Right(receiveResults) <- receiveResultsFuture } yield { // Then - receiveResults.getOrElse(List.empty).size should be(1) - receiveResults.getOrElse(List.empty).map(_.id) should be(msg.id.toList) + receiveResults.size should be(1) + receiveResults.map(_.id) should be(msg.id.toList) } } From 954e457ae83439651476cbb21bf960348c977bd5 Mon Sep 17 00:00:00 2001 From: rwalerow Date: Thu, 21 Feb 2019 15:52:49 +0100 Subject: [PATCH 11/14] Add parsing of inflight messages limit from config --- .../org/elasticmq/server/config/ElasticMQServerConfig.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/scala/org/elasticmq/server/config/ElasticMQServerConfig.scala b/server/src/main/scala/org/elasticmq/server/config/ElasticMQServerConfig.scala index b919f5107..b974bd1e2 100644 --- a/server/src/main/scala/org/elasticmq/server/config/ElasticMQServerConfig.scala +++ b/server/src/main/scala/org/elasticmq/server/config/ElasticMQServerConfig.scala @@ -63,6 +63,7 @@ class ElasticMQServerConfig(config: Config) extends Logging { def getOptionalBoolean(c: Config, k: String) = if (c.hasPath(k)) Some(c.getBoolean(k)) else None def getOptionalDuration(c: Config, k: String) = if (c.hasPath(k)) Some(c.getDuration(k, TimeUnit.SECONDS)) else None def getOptionalString(c: Config, k: String) = if (c.hasPath(k)) Some(c.getString(k)).filter(_.nonEmpty) else None + def getOptionalInt(c: Config, k: String) = if (c.hasPath(k)) Some(c.getInt(k)) else None import scala.collection.JavaConverters._ @@ -95,7 +96,7 @@ class ElasticMQServerConfig(config: Config) extends Logging { copyMessagesTo = getOptionalString(c, "copyTo"), moveMessagesTo = getOptionalString(c, "moveTo"), tags = getOptionalTags(c, "tags"), - inflightMessagesLimit = None + inflightMessagesLimit = getOptionalInt(c, "inflightMessagesLimit") ) } .toList From 082672458213ef7509f8efa8d4864ed535a4e618 Mon Sep 17 00:00:00 2001 From: rwalerow Date: Thu, 23 May 2019 21:30:25 +0200 Subject: [PATCH 12/14] Properly name elasticMQError variable --- .../org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala index fbbe80dc4..3036f88d8 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala @@ -129,9 +129,9 @@ trait ReceiveMessageDirectives { } - case Left(sqsError) => + case Left(elasticMQError) => respondWith(400) { - new SQSException(sqsError.code, 400, sqsError.code).toXml(EmptyRequestId) + new SQSException(elasticMQError.code, 400, elasticMQError.code).toXml(EmptyRequestId) } } } From d02c62a4f9e768fac050e894c2fd074c7f2239c9 Mon Sep 17 00:00:00 2001 From: rwalerow Date: Thu, 23 May 2019 21:42:53 +0200 Subject: [PATCH 13/14] Refactor receive messages to separate method --- .../actor/queue/QueueActorMessageOps.scala | 54 ++++++++++--------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala index f5a276bc6..31d06b82b 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala @@ -110,32 +110,10 @@ trait QueueActorMessageOps extends Logging { if (inflightMessagesRegisty.size >= queueData.inflightMessagesLimit) Left(new OverLimitError(queueData.name)) else { - implicit val np = nowProvider - val messages = receiveRequestAttemptId - .flatMap({ attemptId => - // for a given request id, check for any messages we've dequeued and cached - val cachedMessages = getMessagesFromRequestAttemptCache(attemptId) - - // if the cache returns an empty list instead of None, we still want to pull messages from - // from the queue so return None in that case to properly process down stream - cachedMessages.getOrElse(Nil) match { - case Nil => None - case default => Some(default) - } - }) - .getOrElse(getMessagesFromQueue(visibilityTimeout, count)) - .map { internalMessage => - // Putting the msg again into the queue, with a new next delivery - val newNextDelivery = computeNextDelivery(visibilityTimeout) - internalMessage.trackDelivery(newNextDelivery) - messageQueue += internalMessage - - logger.debug(s"${queueData.name}: Receiving message ${internalMessage.id}") - internalMessage - } + val messages = receiveRequestMessages(visibilityTimeout, count, receiveRequestAttemptId) receiveRequestAttemptId.foreach { attemptId => - receiveRequestAttemptCache.add(attemptId, messages) + receiveRequestAttemptCache.add(attemptId, messages)(nowProvider) } messages.foreach { message => @@ -146,6 +124,34 @@ trait QueueActorMessageOps extends Logging { } } + private def receiveRequestMessages(visibilityTimeout: VisibilityTimeout, + count: Int, + receiveRequestAttemptId: Option[String]): List[InternalMessage] = { + implicit val np = nowProvider + receiveRequestAttemptId + .flatMap({ attemptId => + // for a given request id, check for any messages we've dequeued and cached + val cachedMessages = getMessagesFromRequestAttemptCache(attemptId) + + // if the cache returns an empty list instead of None, we still want to pull messages from + // from the queue so return None in that case to properly process down stream + cachedMessages.getOrElse(Nil) match { + case Nil => None + case default => Some(default) + } + }) + .getOrElse(getMessagesFromQueue(visibilityTimeout, count)) + .map { internalMessage => + // Putting the msg again into the queue, with a new next delivery + val newNextDelivery = computeNextDelivery(visibilityTimeout) + internalMessage.trackDelivery(newNextDelivery) + messageQueue += internalMessage + + logger.debug(s"${queueData.name}: Receiving message ${internalMessage.id}") + internalMessage + } + } + private def getMessagesFromRequestAttemptCache(receiveRequestAttemptId: String)( implicit np: NowProvider): Option[List[InternalMessage]] = { receiveRequestAttemptCache.get(receiveRequestAttemptId, messageQueue) match { From 29ec9ca99cbbec9c202756e64dac481342d76786 Mon Sep 17 00:00:00 2001 From: rwalerow Date: Thu, 23 May 2019 21:45:10 +0200 Subject: [PATCH 14/14] Remove typo/name properly inflightMessagesRegisty -> inflightMessageIds --- .../org/elasticmq/actor/queue/QueueActorMessageOps.scala | 6 +++--- .../scala/org/elasticmq/actor/queue/QueueActorStorage.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala index 31d06b82b..94cce2f39 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorMessageOps.scala @@ -107,7 +107,7 @@ trait QueueActorMessageOps extends Logging { protected def receiveMessages(visibilityTimeout: VisibilityTimeout, count: Int, receiveRequestAttemptId: Option[String]): Either[ElasticMQError, List[MessageData]] = { - if (inflightMessagesRegisty.size >= queueData.inflightMessagesLimit) + if (inflightMessageIds.size >= queueData.inflightMessagesLimit) Left(new OverLimitError(queueData.name)) else { val messages = receiveRequestMessages(visibilityTimeout, count, receiveRequestAttemptId) @@ -117,7 +117,7 @@ trait QueueActorMessageOps extends Logging { } messages.foreach { message => - inflightMessagesRegisty += message.id + inflightMessageIds += message.id } Right(messages.map(_.toMessageData)) @@ -192,7 +192,7 @@ trait QueueActorMessageOps extends Logging { messageQueue.byId.get(msgId).foreach { msgData => if (msgData.deliveryReceipts.lastOption.contains(deliveryReceipt.receipt)) { // Just removing the msg from the map. The msg will be removed from the queue when trying to receive it. - inflightMessagesRegisty -= msgId + inflightMessageIds -= msgId messageQueue.remove(msgId) } } diff --git a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorStorage.scala b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorStorage.scala index 771315606..fc27e8a6e 100644 --- a/core/src/main/scala/org/elasticmq/actor/queue/QueueActorStorage.scala +++ b/core/src/main/scala/org/elasticmq/actor/queue/QueueActorStorage.scala @@ -18,5 +18,5 @@ trait QueueActorStorage { var queueData: QueueData = initialQueueData var messageQueue = MessageQueue(queueData.isFifo) val receiveRequestAttemptCache = new ReceiveRequestAttemptCache - val inflightMessagesRegisty = new mutable.TreeSet[String]() + val inflightMessageIds = new mutable.TreeSet[String]() }