Skip to content

Commit 05facc8

Browse files
author
Patrick Deasy
committed
AMQ-7397 Add preserveDeliveryMode to deadLetterStrategy.
AMQ-7397 Move Message preparation back to RegionBroker. AMQ-7397 Add message persistence tests for dead letter strategy.
1 parent 3400983 commit 05facc8

File tree

4 files changed

+209
-17
lines changed

4 files changed

+209
-17
lines changed

activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -781,23 +781,8 @@ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference
781781
return false;
782782
}
783783

784-
// message may be inflight to other subscriptions so do not modify
785-
message = message.copy();
786-
long dlqExpiration = deadLetterStrategy.getExpiration();
787-
if (dlqExpiration > 0) {
788-
dlqExpiration += System.currentTimeMillis();
789-
} else {
790-
stampAsExpired(message);
791-
}
792-
message.setExpiration(dlqExpiration);
793-
if (!message.isPersistent()) {
794-
message.setPersistent(true);
795-
message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
796-
}
797-
if (poisonCause != null) {
798-
message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
799-
poisonCause.toString());
800-
}
784+
message = prepareMessageForDeadLetterQueue(message, deadLetterStrategy, poisonCause);
785+
801786
// The original destination and transaction id do
802787
// not get filled when the message is first sent,
803788
// it is only populated if the message is routed to
@@ -822,6 +807,28 @@ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference
822807
return false;
823808
}
824809

810+
private Message prepareMessageForDeadLetterQueue(Message message, DeadLetterStrategy deadLetterStrategy, Throwable poisonCause) throws IOException {
811+
// message may be inflight to other subscriptions so do not modify
812+
message = message.copy();
813+
long dlqExpiration = deadLetterStrategy.getExpiration();
814+
if (dlqExpiration > 0) {
815+
dlqExpiration += System.currentTimeMillis();
816+
} else {
817+
stampAsExpired(message);
818+
}
819+
message.setExpiration(dlqExpiration);
820+
if (!message.isPersistent() && !deadLetterStrategy.isPreserveDeliveryMode()) {
821+
message.setPersistent(true);
822+
message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
823+
}
824+
if (poisonCause != null) {
825+
message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
826+
poisonCause.toString());
827+
}
828+
829+
return message;
830+
}
831+
825832
@Override
826833
public Broker getRoot() {
827834
try {

activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717
package org.apache.activemq.broker.region.policy;
1818

1919
import org.apache.activemq.ActiveMQMessageAudit;
20+
import org.apache.activemq.command.ActiveMQMessage;
2021
import org.apache.activemq.command.Message;
2122
import org.slf4j.Logger;
2223
import org.slf4j.LoggerFactory;
2324

25+
import java.io.IOException;
26+
2427
/**
2528
* A strategy for choosing which destination is used for dead letter queue
2629
* messages.
@@ -30,6 +33,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
3033
private static final Logger LOG = LoggerFactory.getLogger(AbstractDeadLetterStrategy.class);
3134
private boolean processNonPersistent = false;
3235
private boolean processExpired = true;
36+
private boolean preserveDeliveryMode = false;
3337
private boolean enableAudit = true;
3438
private long expiration;
3539

@@ -91,6 +95,14 @@ public void setProcessNonPersistent(boolean processNonPersistent) {
9195
this.processNonPersistent = processNonPersistent;
9296
}
9397

98+
@Override
99+
public boolean isPreserveDeliveryMode() { return this.preserveDeliveryMode; }
100+
101+
@Override
102+
public void setPreserveDeliveryMode(boolean preserveDeliveryMode) {
103+
this.preserveDeliveryMode = preserveDeliveryMode;
104+
}
105+
94106
public boolean isEnableAudit() {
95107
return enableAudit;
96108
}

activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,16 @@ public interface DeadLetterStrategy {
6060
*/
6161
public void setProcessNonPersistent(boolean processNonPersistent);
6262

63+
/**
64+
* @return the PreserveDeliveryMode
65+
*/
66+
public boolean isPreserveDeliveryMode();
67+
68+
/**
69+
* @param PreserveDeliveryMode the PreserveDeliveryMode to set
70+
*/
71+
public void setPreserveDeliveryMode(boolean PreserveDeliveryMode);
72+
6373
/**
6474
* Allows for a Message that was already processed by a DLQ to be rolled back in case
6575
* of a move or a retry of that message, otherwise the Message would be considered a
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package org.apache.activemq.broker.policy;
2+
3+
import jakarta.jms.*;
4+
import org.apache.activemq.ActiveMQConnection;
5+
import org.apache.activemq.broker.BrokerService;
6+
import org.apache.activemq.broker.region.policy.*;
7+
import org.apache.activemq.command.ActiveMQQueue;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
public class DeadLetterPersistenceTest extends DeadLetterTest {
12+
private static final Logger LOG = LoggerFactory.getLogger(DiscardingDeadLetterPolicyTest.class);
13+
private static final String CLIENT_ID = "clientID";
14+
private static final String NON_PERSISTENT_DEST = "nonPersistentDest";
15+
private static final String PRESERVE_DELIVERY_DEST = "preserveDeliveryDest";
16+
17+
@Override
18+
protected BrokerService createBroker() throws Exception {
19+
BrokerService broker = super.createBroker();
20+
21+
PolicyEntry policy = new PolicyEntry();
22+
IndividualDeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
23+
strategy.setProcessNonPersistent(true);
24+
strategy.setDestinationPerDurableSubscriber(true);
25+
policy.setDeadLetterStrategy(strategy);
26+
27+
PolicyMap pMap = new PolicyMap();
28+
pMap.setDefaultEntry(policy);
29+
30+
SharedDeadLetterStrategy processNonPersistent = new SharedDeadLetterStrategy();
31+
processNonPersistent.setDeadLetterQueue(new ActiveMQQueue("DLQ." + NON_PERSISTENT_DEST));
32+
processNonPersistent.setProcessNonPersistent(true);
33+
PolicyEntry processNonPersistentDlqPolicy = new PolicyEntry();
34+
processNonPersistentDlqPolicy.setDeadLetterStrategy(processNonPersistent);
35+
36+
pMap.put(new ActiveMQQueue(NON_PERSISTENT_DEST), processNonPersistentDlqPolicy);
37+
38+
SharedDeadLetterStrategy processPreserveDelivery = new SharedDeadLetterStrategy();
39+
processPreserveDelivery.setDeadLetterQueue(new ActiveMQQueue("DLQ." + PRESERVE_DELIVERY_DEST));
40+
processPreserveDelivery.setProcessNonPersistent(true);
41+
processPreserveDelivery.setPreserveDeliveryMode(true);
42+
PolicyEntry processPreserveDeliveryDlqPolicy = new PolicyEntry();
43+
processPreserveDeliveryDlqPolicy.setDeadLetterStrategy(processPreserveDelivery);
44+
45+
pMap.put(new ActiveMQQueue(PRESERVE_DELIVERY_DEST), processPreserveDeliveryDlqPolicy);
46+
47+
broker.setDestinationPolicy(pMap);
48+
49+
return broker;
50+
}
51+
52+
@Override
53+
protected String createClientId() {
54+
return CLIENT_ID;
55+
}
56+
57+
@Override
58+
protected Destination createDlqDestination() {
59+
String prefix = topic ? "ActiveMQ.DLQ.Topic." : "ActiveMQ.DLQ.Queue.";
60+
String destinationName = prefix + getClass().getName() + "." + getName();
61+
if (durableSubscriber) {
62+
String subName = // connectionId:SubName
63+
CLIENT_ID + ":" + getDestination().toString();
64+
destinationName += "." + subName ;
65+
}
66+
return new ActiveMQQueue(destinationName);
67+
}
68+
69+
@Override
70+
protected void doTest() throws Exception {
71+
validateMessagePersistentSetToTrueWhenProducerIsPersistent();
72+
validateMessagePersistentSetToTrueWhenProducerIsNonPeristent();
73+
validateMessagePersitentNotSetWhenPreserveDeliveryModeIsTrue();
74+
}
75+
76+
public void validateMessagePersistentSetToTrueWhenProducerIsPersistent() throws Exception {
77+
messageCount = 1;
78+
connection.start();
79+
80+
ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
81+
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
82+
83+
makeConsumer();
84+
makeDlqConsumer();
85+
sendMessages();
86+
87+
for (int i = 0; i < messageCount; i++) {
88+
consumeAndRollback(i);
89+
}
90+
91+
for (int i = 0; i < messageCount; i++) {
92+
Message msg = dlqConsumer.receive(1000);
93+
assertNotNull("Should be a DLQ message for loop: " + i, msg);
94+
org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg;
95+
assertTrue(commandMsg.isPersistent());
96+
}
97+
98+
session.commit();
99+
}
100+
101+
public void validateMessagePersistentSetToTrueWhenProducerIsNonPeristent() throws Exception {
102+
messageCount = 1;
103+
destination = new ActiveMQQueue(NON_PERSISTENT_DEST);
104+
durableSubscriber = false;
105+
deliveryMode = DeliveryMode.NON_PERSISTENT;
106+
connection.start();
107+
108+
ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
109+
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
110+
111+
makeConsumer();
112+
makeDlqConsumer();
113+
sendMessages();
114+
115+
for (int i = 0; i < messageCount; i++) {
116+
consumeAndRollback(i);
117+
}
118+
119+
dlqDestination = new ActiveMQQueue("DLQ." + NON_PERSISTENT_DEST);
120+
dlqConsumer = session.createConsumer(dlqDestination);
121+
122+
for (int i = 0; i < messageCount; i++) {
123+
Message msg = dlqConsumer.receive(1000);
124+
assertNotNull("Should be a DLQ message for loop: " + i, msg);
125+
assertEquals("NON_PERSISTENT", msg.getStringProperty("originalDeliveryMode"));
126+
org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg;
127+
assertTrue(commandMsg.isPersistent());
128+
}
129+
130+
session.commit();
131+
}
132+
133+
public void validateMessagePersitentNotSetWhenPreserveDeliveryModeIsTrue() throws Exception {
134+
messageCount = 1;
135+
destination = new ActiveMQQueue(PRESERVE_DELIVERY_DEST);
136+
durableSubscriber = false;
137+
deliveryMode = DeliveryMode.NON_PERSISTENT;
138+
connection.start();
139+
140+
ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
141+
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
142+
143+
makeConsumer();
144+
makeDlqConsumer();
145+
sendMessages();
146+
147+
for (int i = 0; i < messageCount; i++) {
148+
consumeAndRollback(i);
149+
}
150+
151+
dlqDestination = new ActiveMQQueue("DLQ." + PRESERVE_DELIVERY_DEST);
152+
dlqConsumer = session.createConsumer(dlqDestination);
153+
154+
for (int i = 0; i < messageCount; i++) {
155+
Message msg = dlqConsumer.receive(1000);
156+
assertNotNull("Should be a DLQ message for loop: " + i, msg);
157+
org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg;
158+
assertFalse(commandMsg.isPersistent());
159+
}
160+
161+
session.commit();
162+
}
163+
}

0 commit comments

Comments
 (0)