Skip to content

Commit 4d1160e

Browse files
author
Patrick Deasy
committed
AMQ-7397 Add preserveDeliveryMode to deadLetterStrategy.
AMQ-7397 Move Message preparation back to RegionBroker. AMQ-7397 Add message persistence tests for dead letter strategy.
1 parent 3400983 commit 4d1160e

File tree

4 files changed

+225
-17
lines changed

4 files changed

+225
-17
lines changed

activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -781,23 +781,8 @@ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference
781781
return false;
782782
}
783783

784-
// message may be inflight to other subscriptions so do not modify
785-
message = message.copy();
786-
long dlqExpiration = deadLetterStrategy.getExpiration();
787-
if (dlqExpiration > 0) {
788-
dlqExpiration += System.currentTimeMillis();
789-
} else {
790-
stampAsExpired(message);
791-
}
792-
message.setExpiration(dlqExpiration);
793-
if (!message.isPersistent()) {
794-
message.setPersistent(true);
795-
message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
796-
}
797-
if (poisonCause != null) {
798-
message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
799-
poisonCause.toString());
800-
}
784+
message = prepareMessageForDeadLetterQueue(message, deadLetterStrategy, poisonCause);
785+
801786
// The original destination and transaction id do
802787
// not get filled when the message is first sent,
803788
// it is only populated if the message is routed to
@@ -822,6 +807,28 @@ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference
822807
return false;
823808
}
824809

810+
private Message prepareMessageForDeadLetterQueue(Message message, DeadLetterStrategy deadLetterStrategy, Throwable poisonCause) throws IOException {
811+
// message may be inflight to other subscriptions so do not modify
812+
message = message.copy();
813+
long dlqExpiration = deadLetterStrategy.getExpiration();
814+
if (dlqExpiration > 0) {
815+
dlqExpiration += System.currentTimeMillis();
816+
} else {
817+
stampAsExpired(message);
818+
}
819+
message.setExpiration(dlqExpiration);
820+
if (!message.isPersistent() && !deadLetterStrategy.isPreserveDeliveryMode()) {
821+
message.setPersistent(true);
822+
message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
823+
}
824+
if (poisonCause != null) {
825+
message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
826+
poisonCause.toString());
827+
}
828+
829+
return message;
830+
}
831+
825832
@Override
826833
public Broker getRoot() {
827834
try {

activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717
package org.apache.activemq.broker.region.policy;
1818

1919
import org.apache.activemq.ActiveMQMessageAudit;
20+
import org.apache.activemq.command.ActiveMQMessage;
2021
import org.apache.activemq.command.Message;
2122
import org.slf4j.Logger;
2223
import org.slf4j.LoggerFactory;
2324

25+
import java.io.IOException;
26+
2427
/**
2528
* A strategy for choosing which destination is used for dead letter queue
2629
* messages.
@@ -30,6 +33,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
3033
private static final Logger LOG = LoggerFactory.getLogger(AbstractDeadLetterStrategy.class);
3134
private boolean processNonPersistent = false;
3235
private boolean processExpired = true;
36+
private boolean preserveDeliveryMode = false;
3337
private boolean enableAudit = true;
3438
private long expiration;
3539

@@ -91,6 +95,14 @@ public void setProcessNonPersistent(boolean processNonPersistent) {
9195
this.processNonPersistent = processNonPersistent;
9296
}
9397

98+
@Override
99+
public boolean isPreserveDeliveryMode() { return this.preserveDeliveryMode; }
100+
101+
@Override
102+
public void setPreserveDeliveryMode(boolean preserveDeliveryMode) {
103+
this.preserveDeliveryMode = preserveDeliveryMode;
104+
}
105+
94106
public boolean isEnableAudit() {
95107
return enableAudit;
96108
}

activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,16 @@ public interface DeadLetterStrategy {
6060
*/
6161
public void setProcessNonPersistent(boolean processNonPersistent);
6262

63+
/**
64+
* @return the preserveDeliveryMode
65+
*/
66+
public boolean isPreserveDeliveryMode();
67+
68+
/**
69+
* @param preserveDeliveryMode the preserveDeliveryMode to set
70+
*/
71+
public void setPreserveDeliveryMode(boolean preserveDeliveryMode);
72+
6373
/**
6474
* Allows for a Message that was already processed by a DLQ to be rolled back in case
6575
* of a move or a retry of that message, otherwise the Message would be considered a
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.broker.policy;
18+
19+
import jakarta.jms.*;
20+
import org.apache.activemq.ActiveMQConnection;
21+
import org.apache.activemq.broker.BrokerService;
22+
import org.apache.activemq.broker.region.policy.*;
23+
import org.apache.activemq.command.ActiveMQQueue;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
public class DeadLetterPersistenceTest extends DeadLetterTest {
28+
private static final Logger LOG = LoggerFactory.getLogger(DiscardingDeadLetterPolicyTest.class);
29+
private static final String CLIENT_ID = "clientID";
30+
private static final String NON_PERSISTENT_DEST = "nonPersistentDest";
31+
private static final String PRESERVE_DELIVERY_DEST = "preserveDeliveryDest";
32+
33+
@Override
34+
protected BrokerService createBroker() throws Exception {
35+
BrokerService broker = super.createBroker();
36+
37+
PolicyEntry policy = new PolicyEntry();
38+
IndividualDeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
39+
strategy.setProcessNonPersistent(true);
40+
strategy.setDestinationPerDurableSubscriber(true);
41+
policy.setDeadLetterStrategy(strategy);
42+
43+
PolicyMap pMap = new PolicyMap();
44+
pMap.setDefaultEntry(policy);
45+
46+
SharedDeadLetterStrategy processNonPersistent = new SharedDeadLetterStrategy();
47+
processNonPersistent.setDeadLetterQueue(new ActiveMQQueue("DLQ." + NON_PERSISTENT_DEST));
48+
processNonPersistent.setProcessNonPersistent(true);
49+
PolicyEntry processNonPersistentDlqPolicy = new PolicyEntry();
50+
processNonPersistentDlqPolicy.setDeadLetterStrategy(processNonPersistent);
51+
52+
pMap.put(new ActiveMQQueue(NON_PERSISTENT_DEST), processNonPersistentDlqPolicy);
53+
54+
SharedDeadLetterStrategy processPreserveDelivery = new SharedDeadLetterStrategy();
55+
processPreserveDelivery.setDeadLetterQueue(new ActiveMQQueue("DLQ." + PRESERVE_DELIVERY_DEST));
56+
processPreserveDelivery.setProcessNonPersistent(true);
57+
processPreserveDelivery.setPreserveDeliveryMode(true);
58+
PolicyEntry processPreserveDeliveryDlqPolicy = new PolicyEntry();
59+
processPreserveDeliveryDlqPolicy.setDeadLetterStrategy(processPreserveDelivery);
60+
61+
pMap.put(new ActiveMQQueue(PRESERVE_DELIVERY_DEST), processPreserveDeliveryDlqPolicy);
62+
63+
broker.setDestinationPolicy(pMap);
64+
65+
return broker;
66+
}
67+
68+
@Override
69+
protected String createClientId() {
70+
return CLIENT_ID;
71+
}
72+
73+
@Override
74+
protected Destination createDlqDestination() {
75+
String prefix = topic ? "ActiveMQ.DLQ.Topic." : "ActiveMQ.DLQ.Queue.";
76+
String destinationName = prefix + getClass().getName() + "." + getName();
77+
if (durableSubscriber) {
78+
String subName = // connectionId:SubName
79+
CLIENT_ID + ":" + getDestination().toString();
80+
destinationName += "." + subName ;
81+
}
82+
return new ActiveMQQueue(destinationName);
83+
}
84+
85+
@Override
86+
protected void doTest() throws Exception {
87+
validateMessagePersistentSetToTrueWhenProducerIsPersistent();
88+
validateMessagePersistentSetToTrueWhenProducerIsNonPeristent();
89+
validateMessagePersitentNotSetWhenPreserveDeliveryModeIsTrue();
90+
}
91+
92+
public void validateMessagePersistentSetToTrueWhenProducerIsPersistent() throws Exception {
93+
messageCount = 1;
94+
connection.start();
95+
96+
ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
97+
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
98+
99+
makeConsumer();
100+
makeDlqConsumer();
101+
sendMessages();
102+
103+
for (int i = 0; i < messageCount; i++) {
104+
consumeAndRollback(i);
105+
}
106+
107+
for (int i = 0; i < messageCount; i++) {
108+
Message msg = dlqConsumer.receive(1000);
109+
assertNotNull("Should be a DLQ message for loop: " + i, msg);
110+
org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg;
111+
assertTrue(commandMsg.isPersistent());
112+
}
113+
114+
session.commit();
115+
}
116+
117+
public void validateMessagePersistentSetToTrueWhenProducerIsNonPeristent() throws Exception {
118+
messageCount = 1;
119+
destination = new ActiveMQQueue(NON_PERSISTENT_DEST);
120+
durableSubscriber = false;
121+
deliveryMode = DeliveryMode.NON_PERSISTENT;
122+
connection.start();
123+
124+
ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
125+
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
126+
127+
makeConsumer();
128+
makeDlqConsumer();
129+
sendMessages();
130+
131+
for (int i = 0; i < messageCount; i++) {
132+
consumeAndRollback(i);
133+
}
134+
135+
dlqDestination = new ActiveMQQueue("DLQ." + NON_PERSISTENT_DEST);
136+
dlqConsumer = session.createConsumer(dlqDestination);
137+
138+
for (int i = 0; i < messageCount; i++) {
139+
Message msg = dlqConsumer.receive(1000);
140+
assertNotNull("Should be a DLQ message for loop: " + i, msg);
141+
assertEquals("NON_PERSISTENT", msg.getStringProperty("originalDeliveryMode"));
142+
org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg;
143+
assertTrue(commandMsg.isPersistent());
144+
}
145+
146+
session.commit();
147+
}
148+
149+
public void validateMessagePersitentNotSetWhenPreserveDeliveryModeIsTrue() throws Exception {
150+
messageCount = 1;
151+
destination = new ActiveMQQueue(PRESERVE_DELIVERY_DEST);
152+
durableSubscriber = false;
153+
deliveryMode = DeliveryMode.NON_PERSISTENT;
154+
connection.start();
155+
156+
ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
157+
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
158+
159+
makeConsumer();
160+
makeDlqConsumer();
161+
sendMessages();
162+
163+
for (int i = 0; i < messageCount; i++) {
164+
consumeAndRollback(i);
165+
}
166+
167+
dlqDestination = new ActiveMQQueue("DLQ." + PRESERVE_DELIVERY_DEST);
168+
dlqConsumer = session.createConsumer(dlqDestination);
169+
170+
for (int i = 0; i < messageCount; i++) {
171+
Message msg = dlqConsumer.receive(1000);
172+
assertNotNull("Should be a DLQ message for loop: " + i, msg);
173+
org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg;
174+
assertFalse(commandMsg.isPersistent());
175+
}
176+
177+
session.commit();
178+
}
179+
}

0 commit comments

Comments
 (0)