Skip to content

Commit 7e810c4

Browse files
author
Patrick Deasy
committed
AMQ-7397 Add message persistence tests for dead letter strategy.
1 parent 862acda commit 7e810c4

File tree

2 files changed

+163
-2
lines changed

2 files changed

+163
-2
lines changed

activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import org.apache.activemq.command.ActiveMQDestination;
2121
import org.apache.activemq.command.Message;
2222

23-
import java.io.IOException;
24-
2523
/**
2624
* A strategy for choosing which destination is used for dead letter queue messages.
2725
*
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package org.apache.activemq.broker.policy;
2+
3+
import jakarta.jms.*;
4+
import org.apache.activemq.ActiveMQConnection;
5+
import org.apache.activemq.broker.BrokerService;
6+
import org.apache.activemq.broker.region.policy.*;
7+
import org.apache.activemq.command.ActiveMQQueue;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
public class DeadLetterPersistenceTest extends DeadLetterTest {
12+
private static final Logger LOG = LoggerFactory.getLogger(DiscardingDeadLetterPolicyTest.class);
13+
private static final String CLIENT_ID = "clientID";
14+
private static final String NON_PERSISTENT_DEST = "nonPersistentDest";
15+
private static final String PRESERVE_DELIVERY_DEST = "preserveDeliveryDest";
16+
17+
@Override
18+
protected BrokerService createBroker() throws Exception {
19+
BrokerService broker = super.createBroker();
20+
21+
PolicyEntry policy = new PolicyEntry();
22+
IndividualDeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
23+
strategy.setProcessNonPersistent(true);
24+
strategy.setDestinationPerDurableSubscriber(true);
25+
policy.setDeadLetterStrategy(strategy);
26+
27+
PolicyMap pMap = new PolicyMap();
28+
pMap.setDefaultEntry(policy);
29+
30+
SharedDeadLetterStrategy processNonPersistent = new SharedDeadLetterStrategy();
31+
processNonPersistent.setDeadLetterQueue(new ActiveMQQueue("DLQ." + NON_PERSISTENT_DEST));
32+
processNonPersistent.setProcessNonPersistent(true);
33+
PolicyEntry processNonPersistentDlqPolicy = new PolicyEntry();
34+
processNonPersistentDlqPolicy.setDeadLetterStrategy(processNonPersistent);
35+
36+
pMap.put(new ActiveMQQueue(NON_PERSISTENT_DEST), processNonPersistentDlqPolicy);
37+
38+
SharedDeadLetterStrategy processPreserveDelivery = new SharedDeadLetterStrategy();
39+
processPreserveDelivery.setDeadLetterQueue(new ActiveMQQueue("DLQ." + PRESERVE_DELIVERY_DEST));
40+
processPreserveDelivery.setProcessNonPersistent(true);
41+
processPreserveDelivery.setPreserveDeliveryMode(true);
42+
PolicyEntry processPreserveDeliveryDlqPolicy = new PolicyEntry();
43+
processPreserveDeliveryDlqPolicy.setDeadLetterStrategy(processPreserveDelivery);
44+
45+
pMap.put(new ActiveMQQueue(PRESERVE_DELIVERY_DEST), processPreserveDeliveryDlqPolicy);
46+
47+
broker.setDestinationPolicy(pMap);
48+
49+
return broker;
50+
}
51+
52+
@Override
53+
protected String createClientId() {
54+
return CLIENT_ID;
55+
}
56+
57+
@Override
58+
protected Destination createDlqDestination() {
59+
String prefix = topic ? "ActiveMQ.DLQ.Topic." : "ActiveMQ.DLQ.Queue.";
60+
String destinationName = prefix + getClass().getName() + "." + getName();
61+
if (durableSubscriber) {
62+
String subName = // connectionId:SubName
63+
CLIENT_ID + ":" + getDestination().toString();
64+
destinationName += "." + subName ;
65+
}
66+
return new ActiveMQQueue(destinationName);
67+
}
68+
69+
@Override
70+
protected void doTest() throws Exception {
71+
validateMessagePersistentSetToTrueWhenProducerIsPersistent();
72+
validateMessagePersistentSetToTrueWhenProducerIsNonPeristent();
73+
validateMessagePersitentNotSetWhenPreserveDeliveryModeIsTrue();
74+
}
75+
76+
public void validateMessagePersistentSetToTrueWhenProducerIsPersistent() throws Exception {
77+
messageCount = 1;
78+
connection.start();
79+
80+
ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
81+
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
82+
83+
makeConsumer();
84+
makeDlqConsumer();
85+
sendMessages();
86+
87+
for (int i = 0; i < messageCount; i++) {
88+
consumeAndRollback(i);
89+
}
90+
91+
for (int i = 0; i < messageCount; i++) {
92+
Message msg = dlqConsumer.receive(1000);
93+
assertNotNull("Should be a DLQ message for loop: " + i, msg);
94+
org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg;
95+
assertTrue(commandMsg.isPersistent());
96+
}
97+
98+
session.commit();
99+
}
100+
101+
public void validateMessagePersistentSetToTrueWhenProducerIsNonPeristent() throws Exception {
102+
messageCount = 1;
103+
destination = new ActiveMQQueue(NON_PERSISTENT_DEST);
104+
durableSubscriber = false;
105+
deliveryMode = DeliveryMode.NON_PERSISTENT;
106+
connection.start();
107+
108+
ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
109+
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
110+
111+
makeConsumer();
112+
makeDlqConsumer();
113+
sendMessages();
114+
115+
for (int i = 0; i < messageCount; i++) {
116+
consumeAndRollback(i);
117+
}
118+
119+
dlqDestination = new ActiveMQQueue("DLQ." + NON_PERSISTENT_DEST);
120+
dlqConsumer = session.createConsumer(dlqDestination);
121+
122+
for (int i = 0; i < messageCount; i++) {
123+
Message msg = dlqConsumer.receive(1000);
124+
assertNotNull("Should be a DLQ message for loop: " + i, msg);
125+
assertEquals("NON_PERSISTENT", msg.getStringProperty("originalDeliveryMode"));
126+
org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg;
127+
assertTrue(commandMsg.isPersistent());
128+
}
129+
130+
session.commit();
131+
}
132+
133+
public void validateMessagePersitentNotSetWhenPreserveDeliveryModeIsTrue() throws Exception {
134+
messageCount = 1;
135+
destination = new ActiveMQQueue(PRESERVE_DELIVERY_DEST);
136+
durableSubscriber = false;
137+
deliveryMode = DeliveryMode.NON_PERSISTENT;
138+
connection.start();
139+
140+
ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
141+
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
142+
143+
makeConsumer();
144+
makeDlqConsumer();
145+
sendMessages();
146+
147+
for (int i = 0; i < messageCount; i++) {
148+
consumeAndRollback(i);
149+
}
150+
151+
dlqDestination = new ActiveMQQueue("DLQ." + PRESERVE_DELIVERY_DEST);
152+
dlqConsumer = session.createConsumer(dlqDestination);
153+
154+
for (int i = 0; i < messageCount; i++) {
155+
Message msg = dlqConsumer.receive(1000);
156+
assertNotNull("Should be a DLQ message for loop: " + i, msg);
157+
org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg;
158+
assertFalse(commandMsg.isPersistent());
159+
}
160+
161+
session.commit();
162+
}
163+
}

0 commit comments

Comments
 (0)