Skip to content

Commit 53d7b33

Browse files
committed
return DeadLetterQueueSourceArn in receive message attributes
1 parent 092e541 commit 53d7b33

File tree

5 files changed

+73
-9
lines changed

5 files changed

+73
-9
lines changed

rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkTestSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2097,6 +2097,35 @@ class AmazonJavaSdkTestSuite extends SqsClientServerCommunication with Matchers
20972097
secondReceiveResult.getMessages shouldBe empty
20982098
}
20992099

2100+
test("should return DeadLetterQueueSourceArn in receive message attributes") {
2101+
// given
2102+
val messageBody = "Message 1"
2103+
client.createQueue(new CreateQueueRequest("testDlq")).getQueueUrl
2104+
val redrivePolicy = RedrivePolicy("testDlq", awsRegion, awsAccountId, 1).toJson.toString
2105+
2106+
val createQueueResult = client
2107+
.createQueue(
2108+
new CreateQueueRequest("main")
2109+
.withAttributes(
2110+
Map(redrivePolicyAttribute -> redrivePolicy).asJava
2111+
)
2112+
)
2113+
.getQueueUrl
2114+
2115+
// when
2116+
client.sendMessage(createQueueResult, messageBody)
2117+
val receiveResult = client.receiveMessage(
2118+
new ReceiveMessageRequest()
2119+
.withQueueUrl(createQueueResult)
2120+
.withAttributeNames("All")
2121+
)
2122+
2123+
// then
2124+
receiveResult.getMessages.asScala.toList.flatMap(_.getAttributes.asScala.toList) should contain(
2125+
("DeadLetterQueueSourceArn", s"arn:aws:sqs:$awsRegion:$awsAccountId:testDlq")
2126+
)
2127+
}
2128+
21002129
test("should list all source queues for a dlq") {
21012130
// given
21022131
val dlqUrl = client.createQueue(new CreateQueueRequest("testDlq")).getQueueUrl

rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/AmazonJavaSdkV2TestSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package org.elasticmq.rest.sqs
22

3+
import org.elasticmq.rest.sqs.model.RedrivePolicy
34
import org.elasticmq.{BinaryMessageAttribute, MessageAttribute, NumberMessageAttribute, StringMessageAttribute}
5+
import org.elasticmq.rest.sqs.model.RedrivePolicyJson.format
46
import org.scalatest.matchers.should.Matchers
57
import software.amazon.awssdk.core.SdkBytes
68
import software.amazon.awssdk.services.sqs.model.{GetQueueUrlRequest => AwsSdkGetQueueUrlRequest, _}
9+
import spray.json.enrichAny
710

811
import scala.collection.JavaConverters._
912

@@ -80,6 +83,30 @@ class AmazonJavaSdkV2TestSuite extends SqsClientServerWithSdkV2Communication wit
8083
)
8184
}
8285

86+
test("should return DeadLetterQueueSourceArn in receive message attributes") {
87+
// Given
88+
clientV2.createQueue(CreateQueueRequest.builder().queueName("testDlq").build())
89+
val queue = clientV2.createQueue(CreateQueueRequest.builder()
90+
.queueName("testQueue1")
91+
.attributes(Map(QueueAttributeName.REDRIVE_POLICY -> RedrivePolicy("testDlq", awsRegion, awsAccountId, 1).toJson.toString).asJava)
92+
.build())
93+
94+
// When
95+
clientV2.sendMessage(SendMessageRequest.builder()
96+
.queueUrl(queue.queueUrl())
97+
.messageBody("test123")
98+
.build())
99+
val receiveResult = clientV2.receiveMessage(ReceiveMessageRequest.builder()
100+
.queueUrl(queue.queueUrl())
101+
.attributeNamesWithStrings("All")
102+
.build())
103+
104+
// Then
105+
receiveResult.messages().asScala.toList.flatMap(_.attributes().asScala.toList) should contain(
106+
(MessageSystemAttributeName.DEAD_LETTER_QUEUE_SOURCE_ARN, s"arn:aws:sqs:$awsRegion:$awsAccountId:testDlq")
107+
)
108+
}
109+
83110
private def doTestSendAndReceiveMessage(content: String): Unit = {
84111
doTestSendAndReceiveMessageWithAttributes(content, Map(), List())
85112
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package org.elasticmq.rest.sqs
2+
3+
trait AwsConfiguration {
4+
5+
def awsRegion: String
6+
7+
def awsAccountId: String
8+
9+
def getArn(queueName: String): String = s"arn:aws:sqs:$awsRegion:$awsAccountId:$queueName"
10+
}

rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/QueueAttributesOps.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,9 @@ import java.time.Duration
1717
import scala.async.Async.{async, await}
1818
import scala.concurrent.{ExecutionContext, Future}
1919

20-
trait QueueAttributesOps extends AttributesModule {
20+
trait QueueAttributesOps extends AttributesModule with AwsConfiguration {
2121
this: Logging =>
2222

23-
def awsRegion: String
24-
25-
def awsAccountId: String
26-
2723
val attributeValuesCalculator = new AttributeValuesCalculator
2824

2925
def getAllQueueAttributes(queueActor: ActorRef, queueData: QueueData)(implicit
@@ -75,7 +71,7 @@ trait QueueAttributesOps extends AttributesModule {
7571
),
7672
AttributeValuesCalculator.Rule(
7773
QueueArnAttribute,
78-
() => Future.successful(s"arn:aws:sqs:$awsRegion:$awsAccountId:${queueData.name}")
74+
() => Future.successful(getArn(queueData.name))
7975
)
8076
)
8177

rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import java.time.Duration
1616
import scala.xml.Elem
1717

1818
trait ReceiveMessageDirectives {
19-
this: ElasticMQDirectives with AttributesModule with SQSLimitsModule with ResponseMarshaller =>
19+
this: ElasticMQDirectives with AttributesModule with SQSLimitsModule with ResponseMarshaller with AwsConfiguration =>
2020
object MessageReadeableAttributeNames {
2121
val SentTimestampAttribute = "SentTimestamp"
2222
val ApproximateReceiveCountAttribute = "ApproximateReceiveCount"
@@ -31,10 +31,11 @@ trait ReceiveMessageDirectives {
3131
val MessageGroupIdAttribute = "MessageGroupId"
3232
val AWSTraceHeaderAttribute = "AWSTraceHeader"
3333
val SequenceNumberAttribute = "SequenceNumber"
34+
val DeadLetterQueueSourceArn = "DeadLetterQueueSourceArn"
3435

3536
val AllAttributeNames = SentTimestampAttribute :: ApproximateReceiveCountAttribute ::
3637
ApproximateFirstReceiveTimestampAttribute :: SenderIdAttribute :: MessageDeduplicationIdAttribute ::
37-
MessageGroupIdAttribute :: AWSTraceHeaderAttribute :: SequenceNumberAttribute :: Nil
38+
MessageGroupIdAttribute :: AWSTraceHeaderAttribute :: SequenceNumberAttribute :: DeadLetterQueueSourceArn :: Nil
3839
}
3940

4041
def receiveMessage(p: RequestPayload)(implicit marshallerDependencies: MarshallerDependencies) = {
@@ -112,7 +113,8 @@ trait ReceiveMessageDirectives {
112113
}).toString)
113114
),
114115
Rule(AWSTraceHeaderAttribute, () => msg.tracingId.map(_.id)),
115-
Rule(SequenceNumberAttribute, () => msg.sequenceNumber)
116+
Rule(SequenceNumberAttribute, () => msg.sequenceNumber),
117+
Rule(DeadLetterQueueSourceArn, () => queueData.deadLettersQueue.map(qd => getArn(qd.name)))
116118
)
117119
}
118120

0 commit comments

Comments
 (0)