Skip to content

Commit c792765

Browse files
committed
ARTEMIS-5483: jmsMaxTextMessageSize
Adds a new configuration option for Jakarta client consumers. When this configuration is set, the consumer will throw a javax.jms.JMSException on receiving large text(type 3) message that would exceed the specified size (in bytes) when creating javax.jms.TextMessage jmsMaxTextMessageSize is intended to prevent consumers of javax.jms.TextMessage from crashing due to out-of-memory when receiving messages that are larger than consumers memory. The configuration is provided as a URL property when establishing a connection to the broker. Example: Throw on large text messages exceeding 10MB in size: tcp://localhost:61616?jmsMaxTextMessageSize=10000000 Note: This option relies on how core client consumer works. When message is above certain size threshold (default 100KB) it will be considered as a large message and be delivered in parts with headers first. With the help of headers we're able to determine the size of the incoming body and reject if it is above jmsMaxTextMessageSize. If the message is below large message threshold then this option has no real defensive effect since the message will be read into memory anyway.
1 parent d846467 commit c792765

File tree

5 files changed

+141
-3
lines changed

5 files changed

+141
-3
lines changed

artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/ActiveMQJMSConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,7 @@ public class ActiveMQJMSConstants {
3434
public static final int INDIVIDUAL_ACKNOWLEDGE = 101;
3535

3636
public static final String JMS_ACTIVEMQ_ENABLE_BYTE_ARRAY_JMS_CORRELATION_ID_PROPERTY_NAME = "amq.jms.support-bytes-id";
37+
38+
public static final String JMS_MAX_TEXT_MESSAGE_SIZE = "jmsMaxTextMessageSize";
39+
3740
}

artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.security.AccessController;
2121
import java.security.PrivilegedAction;
2222
import java.util.HashSet;
23+
import java.util.Optional;
2324
import java.util.Set;
2425
import java.util.concurrent.ExecutorService;
2526
import java.util.concurrent.Executors;
@@ -54,6 +55,7 @@
5455
import org.apache.activemq.artemis.core.version.Version;
5556
import org.apache.activemq.artemis.reader.MessageUtil;
5657
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
58+
import org.apache.activemq.artemis.utils.ConfigurationHelper;
5759
import org.apache.activemq.artemis.utils.UUIDGenerator;
5860
import org.apache.activemq.artemis.utils.VersionLoader;
5961
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
@@ -134,6 +136,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
134136

135137
private final ConnectionFactoryOptions options;
136138

139+
private Integer jmsMaxTextMessageSize;
137140

138141
public ActiveMQConnection(final ConnectionFactoryOptions options,
139142
final String username,
@@ -170,6 +173,16 @@ public ActiveMQConnection(final ConnectionFactoryOptions options,
170173
this.enable1xPrefixes = enable1xPrefixes;
171174

172175
creationStack = new Exception();
176+
177+
if (sessionFactory != null && sessionFactory.getConnectorConfiguration() != null) {
178+
int maxSize = ConfigurationHelper.getIntProperty(
179+
ActiveMQJMSConstants.JMS_MAX_TEXT_MESSAGE_SIZE,
180+
0,
181+
sessionFactory.getConnectorConfiguration().getExtraParams());
182+
if (maxSize > 0) {
183+
this.jmsMaxTextMessageSize = maxSize;
184+
}
185+
}
173186
}
174187

175188
/**
@@ -647,6 +660,10 @@ public void authorize(boolean validateClientId) throws JMSException {
647660
}
648661
}
649662

663+
public Optional<Integer> getJmsMaxTextMessageSize() {
664+
return Optional.ofNullable(jmsMaxTextMessageSize);
665+
}
666+
650667
private void addSessionMetaData(ClientSession session) throws ActiveMQException {
651668
session.addMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY, "");
652669
if (clientID != null) {
@@ -680,7 +697,6 @@ public String getDeserializationAllowList() {
680697
return this.factoryReference.getDeserializationAllowList();
681698
}
682699

683-
684700
private static class JMSFailureListener implements SessionFailureListener {
685701

686702
private final WeakReference<ActiveMQConnection> connectionRef;

artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@
3939
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
4040
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQCompatibleMessage;
4141

42+
import java.util.Optional;
43+
44+
import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
45+
4246
/**
4347
* ActiveMQ Artemis implementation of a JMS MessageConsumer.
4448
*/
@@ -66,8 +70,6 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
6670

6771
private final SimpleString autoDeleteQueueName;
6872

69-
70-
7173
protected ActiveMQMessageConsumer(final ConnectionFactoryOptions options,
7274
final ActiveMQConnection connection,
7375
final ActiveMQSession session,
@@ -216,6 +218,17 @@ private ActiveMQMessage getMessage(final long timeout, final boolean noWait) thr
216218
ActiveMQMessage jmsMsg = null;
217219

218220
if (coreMessage != null) {
221+
222+
Optional<Integer> jmsMaxTextMessageSize = connection.getJmsMaxTextMessageSize();
223+
if (jmsMaxTextMessageSize.isPresent()) {
224+
if (coreMessage.getType() == TEXT_TYPE && coreMessage.getBodySize() > jmsMaxTextMessageSize.get()) {
225+
String errorMsg = "The text message exceeds maximum set size of %d bytes.".formatted(jmsMaxTextMessageSize.get());
226+
ActiveMQException amqe = new ActiveMQException(errorMsg);
227+
ActiveMQClientLogger.LOGGER.unableToGetMessage(amqe);
228+
throw amqe;
229+
}
230+
}
231+
219232
ClientSession coreSession = session.getCoreSession();
220233
boolean needSession = ackMode == Session.CLIENT_ACKNOWLEDGE ||
221234
ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE ||

tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageConsumerTest.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.concurrent.atomic.AtomicBoolean;
5050
import java.util.concurrent.atomic.AtomicInteger;
5151

52+
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
5253
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
5354
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
5455
import org.apache.activemq.artemis.utils.Wait;
@@ -3670,6 +3671,92 @@ public void testExceptionMessageListenerStopSession() throws Exception {
36703671
}
36713672
}
36723673

3674+
@Test
3675+
public void testReceiveThrowsTextMessageAboveSetMaxSize() throws Exception {
3676+
Connection producerConnection = null;
3677+
3678+
Connection consumerConnection = null;
3679+
3680+
try {
3681+
String brokerURL = "tcp://127.0.0.1:61616?blockOnAcknowledge=true&blockOnDurableSend=true&blockOnNonDurableSend=true";
3682+
3683+
producerConnection = createConnection(new ActiveMQConnectionFactory(brokerURL));
3684+
3685+
var cf = new ActiveMQConnectionFactory(brokerURL + "&jmsMaxTextMessageSize=20");
3686+
cf.setMinLargeMessageSize(10);
3687+
consumerConnection = createConnection(
3688+
cf
3689+
);
3690+
3691+
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
3692+
3693+
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
3694+
3695+
MessageProducer queueProducer = producerSession.createProducer(queue1);
3696+
3697+
MessageConsumer queueConsumer = consumerSession.createConsumer(queue1);
3698+
3699+
consumerConnection.start();
3700+
3701+
TextMessage tm = producerSession.createTextMessage("Hello, world!");
3702+
3703+
queueProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
3704+
3705+
queueProducer.send(tm);
3706+
3707+
JMSException exception = ProxyAssertSupport.assertThrows(JMSException.class, queueConsumer::receive);
3708+
ProxyAssertSupport.assertEquals("The text message exceeds maximum set size of 20 bytes.", exception.getMessage());
3709+
} finally {
3710+
if (producerConnection != null) {
3711+
producerConnection.close();
3712+
}
3713+
if (consumerConnection != null) {
3714+
consumerConnection.close();
3715+
}
3716+
}
3717+
}
3718+
3719+
@Test
3720+
public void testReceiveDoesNotThrowWhenMessageIsBelowSetMaximuSize() throws Exception {
3721+
Connection producerConnection = null;
3722+
3723+
Connection consumerConnection = null;
3724+
3725+
try {
3726+
String brokerURL = "tcp://127.0.0.1:61616?blockOnAcknowledge=true&blockOnDurableSend=true&blockOnNonDurableSend=true";
3727+
3728+
producerConnection = createConnection(new ActiveMQConnectionFactory(brokerURL));
3729+
3730+
consumerConnection = createConnection(
3731+
new ActiveMQConnectionFactory(brokerURL + "&jmsMaxTextMessageSize=100")
3732+
);
3733+
3734+
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
3735+
3736+
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
3737+
3738+
MessageProducer queueProducer = producerSession.createProducer(queue1);
3739+
3740+
MessageConsumer queueConsumer = consumerSession.createConsumer(queue1);
3741+
3742+
consumerConnection.start();
3743+
3744+
TextMessage tm = producerSession.createTextMessage("Hello, world!");
3745+
3746+
queueProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
3747+
3748+
queueProducer.send(tm);
3749+
3750+
ProxyAssertSupport.assertDoesNotThrow(queueConsumer::receive);
3751+
} finally {
3752+
if (producerConnection != null) {
3753+
producerConnection.close();
3754+
}
3755+
if (consumerConnection != null) {
3756+
consumerConnection.close();
3757+
}
3758+
}
3759+
}
36733760

36743761
private class ConnectionCloseMessageListener implements MessageListener {
36753762

tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/util/ProxyAssertSupport.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.activemq.artemis.jms.tests.util;
1818

1919
import org.junit.jupiter.api.Assertions;
20+
import org.junit.jupiter.api.function.Executable;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
2223
import java.lang.invoke.MethodHandles;
@@ -341,4 +342,22 @@ public static void assertNotSame(final java.lang.Object object, final java.lang.
341342
throw e;
342343
}
343344
}
345+
346+
public static <T extends Throwable> T assertThrows(Class<T> expectedType, Executable executable) {
347+
try {
348+
return Assertions.assertThrows(expectedType, executable);
349+
} catch (AssertionError e) {
350+
logger.warn("AssertionFailure::", e);
351+
throw e;
352+
}
353+
}
354+
355+
public static void assertDoesNotThrow(Executable executable) {
356+
try {
357+
Assertions.assertDoesNotThrow(executable);
358+
} catch (AssertionError e) {
359+
logger.warn("AssertionFailure::", e);
360+
throw e;
361+
}
362+
}
344363
}

0 commit comments

Comments
 (0)