Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit for inflight messages #167

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/scala/org/elasticmq/ElasticMQError.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,8 @@ class MessageDoesNotExist(val queueName: String, messageId: MessageId) extends E
val code = "MessageDoesNotExist"
val message = s"Message does not exist: $messageId in queue: $queueName"
}

class OverLimitError(val queueName: String) extends ElasticMQError {
val code = "OverLimit"
val message = s"Inflight message limit exceeded for queue: $queueName"
}
1 change: 1 addition & 0 deletions core/src/main/scala/org/elasticmq/QueueData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ case class QueueData(name: String,
receiveMessageWait: Duration,
created: DateTime,
lastModified: DateTime,
inflightMessagesLimit: Int,
deadLettersQueue: Option[DeadLettersQueueData] = None,
isFifo: Boolean = false,
hasContentBasedDeduplication: Boolean = false,
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/elasticmq/actor/queue/QueueActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,13 @@ class QueueActor(val nowProvider: NowProvider,
case m: QueueMessageMsg[T] => receiveAndReplyMessageMsg(m)
}
}

object QueueActorDefaults {

private val defaultInflightLimitFifo = 20000
private val defaultInflightLimitStandard = 120000

def defaultInflightMessagesLimit(isFifo: Boolean): Int =
if (isFifo) defaultInflightLimitFifo
else defaultInflightLimitStandard
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.elasticmq.actor.queue

import akka.actor.ActorRef
import org.elasticmq.OverLimitError
import org.elasticmq.actor.reply._
import org.elasticmq.msg.{DeleteMessage, LookupMessage, ReceiveMessages, SendMessage, UpdateVisibilityTimeout, _}
import org.elasticmq.util.{Logging, NowProvider}
Expand Down Expand Up @@ -105,36 +106,44 @@ trait QueueActorMessageOps extends Logging {

protected def receiveMessages(visibilityTimeout: VisibilityTimeout,
count: Int,
receiveRequestAttemptId: Option[String]): List[MessageData] = {
implicit val np = nowProvider
val messages = receiveRequestAttemptId
.flatMap({ attemptId =>
// for a given request id, check for any messages we've dequeued and cached
val cachedMessages = getMessagesFromRequestAttemptCache(attemptId)

// if the cache returns an empty list instead of None, we still want to pull messages from
// from the queue so return None in that case to properly process down stream
cachedMessages.getOrElse(Nil) match {
case Nil => None
case default => Some(default)
receiveRequestAttemptId: Option[String]): Either[ElasticMQError, List[MessageData]] = {
if (inflightMessagesRegisty.size >= queueData.inflightMessagesLimit)
Left(new OverLimitError(queueData.name))
else {
implicit val np = nowProvider
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe extract this to a method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did refactor receiving messages to private method

val messages = receiveRequestAttemptId
.flatMap({ attemptId =>
// for a given request id, check for any messages we've dequeued and cached
val cachedMessages = getMessagesFromRequestAttemptCache(attemptId)

// if the cache returns an empty list instead of None, we still want to pull messages from
// from the queue so return None in that case to properly process down stream
cachedMessages.getOrElse(Nil) match {
case Nil => None
case default => Some(default)
}
})
.getOrElse(getMessagesFromQueue(visibilityTimeout, count))
.map { internalMessage =>
// Putting the msg again into the queue, with a new next delivery
val newNextDelivery = computeNextDelivery(visibilityTimeout)
internalMessage.trackDelivery(newNextDelivery)
messageQueue += internalMessage

logger.debug(s"${queueData.name}: Receiving message ${internalMessage.id}")
internalMessage
}
})
.getOrElse(getMessagesFromQueue(visibilityTimeout, count))
.map { internalMessage =>
// Putting the msg again into the queue, with a new next delivery
val newNextDelivery = computeNextDelivery(visibilityTimeout)
internalMessage.trackDelivery(newNextDelivery)
messageQueue += internalMessage

logger.debug(s"${queueData.name}: Receiving message ${internalMessage.id}")
internalMessage

receiveRequestAttemptId.foreach { attemptId =>
receiveRequestAttemptCache.add(attemptId, messages)
}

receiveRequestAttemptId.foreach { attemptId =>
receiveRequestAttemptCache.add(attemptId, messages)
}
messages.foreach { message =>
inflightMessagesRegisty += message.id
}

messages.map(_.toMessageData)
Right(messages.map(_.toMessageData))
}
}

private def getMessagesFromRequestAttemptCache(receiveRequestAttemptId: String)(
Expand Down Expand Up @@ -177,6 +186,7 @@ trait QueueActorMessageOps extends Logging {
messageQueue.byId.get(msgId).foreach { msgData =>
if (msgData.deliveryReceipts.lastOption.contains(deliveryReceipt.receipt)) {
// Just removing the msg from the map. The msg will be removed from the queue when trying to receive it.
inflightMessagesRegisty -= msgId
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[typo] Registy. Though maybe simply name it sth like inflightMessageIds, as that's what this is

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion! Renamed

messageQueue.remove(msgId)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import akka.actor.ActorRef
import org.elasticmq.QueueData
import org.elasticmq.util.NowProvider

import scala.collection.mutable

trait QueueActorStorage {
def nowProvider: NowProvider

Expand All @@ -16,4 +18,5 @@ trait QueueActorStorage {
var queueData: QueueData = initialQueueData
var messageQueue = MessageQueue(queueData.isFifo)
val receiveRequestAttemptCache = new ReceiveRequestAttemptCache
val inflightMessagesRegisty = new mutable.TreeSet[String]()
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO
val result = super.receiveAndReplyMessageMsg(msg)
val waitForMessages =
waitForMessagesOpt.getOrElse(queueData.receiveMessageWait)
if (result == ReplyWith(Nil) && waitForMessages.getMillis > 0) {
if (result == ReplyWith(Right(Nil)) && waitForMessages.getMillis > 0) {
val seq = assignSequenceFor(rm)
logger.debug(s"${queueData.name}: Awaiting messages: start for sequence $seq.")
scheduleTimeoutReply(seq, waitForMessages)
Expand All @@ -67,13 +67,15 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO
AwaitingData(originalSender, ReceiveMessages(visibilityTimeout, count, _, receiveRequestAttemptId), _))) =>
val received = super.receiveMessages(visibilityTimeout, count, receiveRequestAttemptId)

if (received != Nil) {
originalSender ! received
logger.debug(
s"${queueData.name}: Awaiting messages: replying to sequence $seq with ${received.size} messages.")
awaitingReply.remove(seq)
received match {
case Right(receivedList) if receivedList.nonEmpty =>
originalSender ! received
logger.debug(
s"${queueData.name}: Awaiting messages: replying to sequence $seq with ${receivedList.size} messages.")
awaitingReply.remove(seq)

tryReply()
tryReply()
case _ => ()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and what if there are requests waiting for receiving a message, but the limit is hit? Does deleting a message trigger responding to waiting requests as well?

}
case _ => // do nothing
}
Expand All @@ -87,7 +89,7 @@ trait QueueActorWaitForMessagesOps extends ReplyingActor with QueueActorMessageO
}

private def scheduleTimeoutReply(seq: Long, waitForMessages: Duration): Unit = {
schedule(waitForMessages.getMillis, ReplyIfTimeout(seq, Nil))
schedule(waitForMessages.getMillis, ReplyIfTimeout(seq, Right(Nil)))
}

private def scheduleTryReplyWhenAvailable(): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/elasticmq/msg/QueueMsg.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ case class ReceiveMessages(visibilityTimeout: VisibilityTimeout,
count: Int,
waitForMessages: Option[Duration],
receiveRequestAttemptId: Option[String])
extends QueueMessageMsg[List[MessageData]]
extends QueueMessageMsg[Either[ElasticMQError, List[MessageData]]]
case class DeleteMessage(deliveryReceipt: DeliveryReceipt) extends QueueMessageMsg[Unit]
case class LookupMessage(messageId: MessageId) extends QueueMessageMsg[Option[MessageData]]
74 changes: 43 additions & 31 deletions core/src/test/scala/org/elasticmq/actor/QueueActorMsgOpsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D

// When

lookupResult <- queueActor2 ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
Right(lookupResult) <- queueActor2 ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
} yield {
// Then
lookupResult should be(Nil)
Expand All @@ -91,7 +91,7 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D
_ <- queueActor1 ? SendMessage(m)

// When
lookupResult <- queueActor1 ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
Right(lookupResult) <- queueActor1 ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
} yield {
// Then
withoutDeliveryReceipt(lookupResult.headOption)
Expand Down Expand Up @@ -129,7 +129,7 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D

// When
lookupBeforeReceiving <- queueActor ? LookupMessage(MessageId("xyz"))
received <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
Right(received) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
lookupAfterReceiving <- queueActor ? LookupMessage(MessageId("xyz"))
} yield {
// Then
Expand All @@ -154,9 +154,9 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D
_ <- queueActor ? SendMessage(createNewMessageData("xyz", "123", Map(), MillisNextDelivery(50L)))

// When
received1 <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
Right(received1) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
_ = nowProvider.mutableNowMillis.set(101L)
received2 <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
Right(received2) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
} yield {
// Then
val received1Receipt = received1.flatMap(_.deliveryReceipt)
Expand All @@ -178,7 +178,7 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D
_ <- queueActor ? SendMessage(createNewMessageData("xyz", "123", Map(), MillisNextDelivery(123L)))

// When
receiveResult <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
Right(receiveResult) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
} yield {
// Then
receiveResult should be(Nil)
Expand Down Expand Up @@ -220,7 +220,7 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D
// When
_ <- queueActor ? UpdateVisibilityTimeout(m2.id.get, MillisVisibilityTimeout(10L))
_ = nowProvider.mutableNowMillis.set(110L)
receiveResult <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
Right(receiveResult) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
} yield {
// Then
// This should find the first msg, as it has the visibility timeout decreased.
Expand All @@ -236,7 +236,7 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D
for {
Right(queueActor) <- queueManagerActor ? CreateQueue(q1)
_ <- queueActor ? SendMessage(m1)
List(m1data) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
Right(List(m1data)) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)

// When
_ <- queueActor ? DeleteMessage(m1data.deliveryReceipt.get)
Expand All @@ -258,9 +258,9 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D

// When
Some(lookupResult) <- queueActor ? LookupMessage(m1.id.get)
List(receiveResult1) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
Right(List(receiveResult1)) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
_ = nowProvider.mutableNowMillis.set(110L)
List(receiveResult2) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
Right(List(receiveResult2)) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 1, None, None)
} yield {
// Then
lookupResult.statistics should be(MessageStatistics.empty)
Expand All @@ -284,8 +284,8 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D
_ <- queueActor ? SendMessage(m5)

// When
receiveResults1 <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 3, None, None)
receiveResults2 <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 2, None, None)
Right(receiveResults1) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 3, None, None)
Right(receiveResults2) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 2, None, None)
} yield {
// Then
receiveResults1.size should be(3)
Expand All @@ -308,7 +308,7 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D
_ <- queueActor ? SendMessage(m3)

// When
receiveResults <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
Right(receiveResults) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
} yield {
// Then
receiveResults.size should be(3)
Expand All @@ -326,7 +326,10 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D
Right(queueActor) <- queueManagerActor ? CreateQueue(q1)

// When
receiveResults <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, Some(Duration.millis(500L)), None)
Right(receiveResults) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout,
5,
Some(Duration.millis(500L)),
None)
} yield {
// Then
val end = System.currentTimeMillis()
Expand Down Expand Up @@ -358,8 +361,8 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D
receiveResults <- receiveResultsFuture
} yield {
// Then
receiveResults.size should be(1)
receiveResults.map(_.id) should be(msg.id.toList)
receiveResults.getOrElse(List.empty).size should be(1)
receiveResults.getOrElse(List.empty).map(_.id) should be(msg.id.toList)
}
}

Expand All @@ -384,8 +387,8 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D

_ <- { Thread.sleep(500); queueActor ? SendMessage(msg) }

receiveResults1 <- receiveResults1Future
receiveResults2 <- receiveResults2Future
Right(receiveResults1) <- receiveResults1Future
Right(receiveResults2) <- receiveResults2Future
} yield {
// Then
val end = System.currentTimeMillis()
Expand Down Expand Up @@ -426,9 +429,9 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D
queueActor ? SendMessage(msg2)
}

receiveResults1 <- receiveResults1Future
receiveResults2 <- receiveResults2Future
receiveResults3 <- receiveResults3Future
Right(receiveResults1) <- receiveResults1Future
Right(receiveResults2) <- receiveResults2Future
Right(receiveResults3) <- receiveResults3Future
} yield {
// Then
List(receiveResults1.size, receiveResults2.size, receiveResults3.size).sum should be(2)
Expand All @@ -455,10 +458,13 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D
_ <- queueActor ? SendMessage(m1)

// When
receiveResults <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
Right(receiveResults) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
_ = nowProvider.mutableNowMillis.set(1000L)
receiveResultsEmpty <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
receiveResultsDeadLettersQueue <- deadLettersQueueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
Right(receiveResultsEmpty) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
Right(receiveResultsDeadLettersQueue) <- deadLettersQueueActor ? ReceiveMessages(DefaultVisibilityTimeout,
5,
None,
None)
} yield {
// Then
receiveResults.size should be(1)
Expand Down Expand Up @@ -490,10 +496,13 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D
_ <- queueActor ? SendMessage(m1)

// When
receiveResults <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
Right(receiveResults) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
_ = nowProvider.mutableNowMillis.set(1000L)
receiveResultsEmpty <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
receiveResultsDeadLettersQueue <- deadLettersQueueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
Right(receiveResultsEmpty) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
Right(receiveResultsDeadLettersQueue) <- deadLettersQueueActor ? ReceiveMessages(DefaultVisibilityTimeout,
5,
None,
None)
} yield {
// Then
receiveResults.size should be(1)
Expand All @@ -519,8 +528,8 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D
// When
sendAck <- queueActor ? SendMessage(m1)
_ = nowProvider.mutableNowMillis.set(1000L)
receiveResultsOriginal <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
receiveResultsMoved <- redirectToActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
Right(receiveResultsOriginal) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
Right(receiveResultsMoved) <- redirectToActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
} yield {
// Then
sendAck.id should be(MessageId(m1ID))
Expand All @@ -543,8 +552,11 @@ class QueueActorMsgOpsTest extends ActorTest with QueueManagerForEachTest with D
// When
_ <- queueActor ? SendMessage(m1)
_ = nowProvider.mutableNowMillis.set(1000L)
resultsOriginal <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
resultsCopied <- copyToActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, Some(Duration.millis(500)), None)
Right(resultsOriginal) <- queueActor ? ReceiveMessages(DefaultVisibilityTimeout, 5, None, None)
Right(resultsCopied) <- copyToActor ? ReceiveMessages(DefaultVisibilityTimeout,
5,
Some(Duration.millis(500)),
None)
} yield {
// Then
resultsOriginal.map(_.id) should be(List(MessageId(m1ID)))
Expand Down
Loading