From dbb78fb4d9ee1cd9b55478351f1174197adabde9 Mon Sep 17 00:00:00 2001 From: Ken Liao Date: Mon, 9 Dec 2024 17:19:05 -0800 Subject: [PATCH 1/5] Implement Jakarta Messaging 3.1 feature, async send with CompletionListener. Added unit tests to cover behaviour required by specs. --- .../apache/activemq/ActiveMQConnection.java | 63 ++ .../activemq/ActiveMQMessageProducer.java | 78 ++- .../org/apache/activemq/ActiveMQProducer.java | 9 +- .../org/apache/activemq/ActiveMQSession.java | 163 +++++- .../apache/activemq/util/CountdownLock.java | 55 ++ .../jms2/ActiveMQJMS2AsyncSendTest.java | 544 ++++++++++++++++++ .../jms2/ActiveMQJMS2ContextTest.java | 22 +- 7 files changed, 902 insertions(+), 32 deletions(-) create mode 100644 activemq-client/src/main/java/org/apache/activemq/util/CountdownLock.java create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AsyncSendTest.java diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index a91349b28b2..5075958c753 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import jakarta.jms.CompletionListener; import jakarta.jms.Connection; import jakarta.jms.ConnectionConsumer; import jakarta.jms.ConnectionMetaData; @@ -1439,6 +1440,68 @@ public void onCompletion(FutureResponse resp) { } } + /** + * Send a packet through a Connection - for internal use only + * + * @param command + * + * @throws JMSException + */ + public void syncSendPacket(final Command command, final CompletionListener completionListener) throws JMSException { + if(completionListener==null) { + syncSendPacket(command); + } else { + if (isClosed()) { + throw new ConnectionClosedException(); + } + try { + this.transport.asyncRequest(command, resp -> { + Response response; + Throwable exception = null; + try { + response = resp.getResult(); + if (response.isException()) { + ExceptionResponse er = (ExceptionResponse)response; + exception = er.getException(); + } + } catch (Exception e) { + exception = e; + } + if (exception != null) { + if ( exception instanceof JMSException) { + completionListener.onException((jakarta.jms.Message) command, (JMSException) exception); + } else { + if (isClosed() || closing.get()) { + LOG.debug("Received an exception but connection is closing"); + } + JMSException jmsEx = null; + try { + jmsEx = JMSExceptionSupport.create(exception); + } catch(Throwable e) { + LOG.error("Caught an exception trying to create a JMSException for " +exception,e); + } + // dispose of transport for security exceptions on connection initiation + if (exception instanceof SecurityException && command instanceof ConnectionInfo){ + try { + forceCloseOnSecurityException(exception); + } catch (Throwable t) { + // We throw the original error from the ExceptionResponse instead. + } + } + if (jmsEx != null) { + completionListener.onException((jakarta.jms.Message) command, jmsEx); + } + } + } else { + completionListener.onCompletion((jakarta.jms.Message) command); + } + }); + } catch (IOException e) { + throw JMSExceptionSupport.create(e); + } + } + } + private void forceCloseOnSecurityException(Throwable exception) { LOG.trace("force close on security exception:{}, transport={}", this, transport, exception); onException(new IOException("Force close due to SecurityException on connect", exception)); diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java index 185ebffd41b..6b352c7705b 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java @@ -23,10 +23,10 @@ import jakarta.jms.CompletionListener; import jakarta.jms.Destination; import jakarta.jms.IllegalStateException; +import jakarta.jms.IllegalStateRuntimeException; import jakarta.jms.InvalidDestinationException; import jakarta.jms.JMSException; import jakarta.jms.Message; - import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerId; @@ -83,11 +83,13 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl private final long startTime; private MessageTransformer transformer; private MemoryUsage producerWindow; + private final ThreadLocal inCompletionListenerCallback = new ThreadLocal<>(); protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException { super(session); this.info = new ProducerInfo(producerId); this.info.setWindowSize(session.connection.getProducerWindowSize()); + inCompletionListenerCallback.set(false); // Allows the options on the destination to configure the producerInfo if (destination != null && destination.getOptions() != null) { Map options = IntrospectionSupport.extractProperties( @@ -168,6 +170,9 @@ public Destination getDestination() throws JMSException { */ @Override public void close() throws JMSException { + if (inCompletionListenerCallback.get()) { + throw new IllegalStateRuntimeException("Can't close message producer within CompletionListener"); + } if (!closed) { dispose(); this.session.asyncSendPacket(info.createRemoveCommand()); @@ -239,27 +244,88 @@ public void send(Destination destination, Message message, int deliveryMode, int */ @Override public void send(Message message, CompletionListener completionListener) throws JMSException { - throw new UnsupportedOperationException("send(Message, CompletionListener) is not supported"); + this.send(getDestination(), + message, + defaultDeliveryMode, + defaultPriority, + defaultTimeToLive, + completionListener); } + @Override public void send(Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException { - throw new UnsupportedOperationException("send(Message, deliveryMode, priority, timetoLive, CompletionListener) is not supported"); + this.send(this.getDestination(), + message, + deliveryMode, + priority, + timeToLive, + completionListener); } @Override public void send(Destination destination, Message message, CompletionListener completionListener) throws JMSException { - throw new UnsupportedOperationException("send(Destination, Message, CompletionListener) is not supported"); + this.send(destination, + message, + defaultDeliveryMode, + defaultPriority, + defaultTimeToLive, + completionListener); } @Override public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException { - throw new UnsupportedOperationException("send(Destination, Message, deliveryMode, priority, timetoLive, CompletionListener) is not supported"); + this.send(destination, message, deliveryMode, priority, timeToLive, + getDisableMessageID(), getDisableMessageTimestamp(), completionListener); + } + + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean disableMessageID, boolean disableMessageTimestamp, CompletionListener completionListener) throws JMSException { + checkClosed(); + if (destination == null) { + if (info.getDestination() == null) { + throw new UnsupportedOperationException("A destination must be specified."); + } + throw new InvalidDestinationException("Don't understand null destinations"); + } + + ActiveMQDestination dest; + if (destination.equals(info.getDestination())) { + dest = (ActiveMQDestination)destination; + } else if (info.getDestination() == null) { + dest = ActiveMQDestination.transform(destination); + } else { + throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName()); + } + if (dest == null) { + throw new JMSException("No destination specified"); + } + + if (transformer != null) { + Message transformedMessage = transformer.producerTransform(session, this, message); + if (transformedMessage != null) { + message = transformedMessage; + } + } + + if (producerWindow != null) { + try { + producerWindow.waitForSpace(); + } catch (InterruptedException e) { + throw new JMSException("Send aborted due to thread interrupt."); + } + } + + this.session.send(this, dest, message, deliveryMode, priority, timeToLive, disableMessageID, + disableMessageTimestamp, producerWindow, sendTimeout, completionListener, inCompletionListenerCallback); + + stats.onMessage(); } - public void send(Message message, AsyncCallback onComplete) throws JMSException { + + public void send(Message message, AsyncCallback onComplete) throws JMSException { this.send(this.getDestination(), message, this.defaultDeliveryMode, diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java index dc7311981e4..fd316efef6b 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java @@ -56,6 +56,7 @@ public class ActiveMQProducer implements JMSProducer { // Properties applied to all messages on a per-JMS producer instance basis private Map messageProperties = null; + private CompletionListener completionListener = null; ActiveMQProducer(ActiveMQContext activemqContext, ActiveMQMessageProducer activemqMessageProducer) { this.activemqContext = activemqContext; @@ -86,8 +87,7 @@ public JMSProducer send(Destination destination, Message message) { message.setObjectProperty(propertyEntry.getKey(), propertyEntry.getValue()); } } - - activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), getDisableMessageID(), getDisableMessageTimestamp(), null); + activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), getDisableMessageID(), getDisableMessageTimestamp(), getAsync()); } catch (JMSException e) { throw JMSExceptionSupport.convertToJMSRuntimeException(e); } @@ -253,12 +253,13 @@ public long getDeliveryDelay() { @Override public JMSProducer setAsync(CompletionListener completionListener) { - throw new UnsupportedOperationException("setAsync(CompletionListener) is not supported"); + this.completionListener = completionListener; + return this; } @Override public CompletionListener getAsync() { - throw new UnsupportedOperationException("getAsync() is not supported"); + return this.completionListener; } @Override diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 766005f2872..c0a2a2ac3a9 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -31,8 +31,10 @@ import java.util.concurrent.atomic.AtomicInteger; import jakarta.jms.BytesMessage; +import jakarta.jms.CompletionListener; import jakarta.jms.Destination; import jakarta.jms.IllegalStateException; +import jakarta.jms.IllegalStateRuntimeException; import jakarta.jms.InvalidDestinationException; import jakarta.jms.InvalidSelectorException; import jakarta.jms.JMSException; @@ -57,7 +59,6 @@ import jakarta.jms.TopicSession; import jakarta.jms.TopicSubscriber; import jakarta.jms.TransactionRolledBackException; - import org.apache.activemq.blob.BlobDownloader; import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.blob.BlobUploader; @@ -93,6 +94,7 @@ import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.util.Callback; +import org.apache.activemq.util.CountdownLock; import org.apache.activemq.util.LongSequenceGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -234,6 +236,8 @@ public static interface DeliveryListener { private MessageTransformer transformer; private BlobTransferPolicy blobTransferPolicy; private long lastDeliveredSequenceId = -2; + private CountdownLock numIncompletedAsyncSend = new CountdownLock(); + private ThreadLocal inCompletionListenerCallback = new ThreadLocal<>(); /** * Construct the Session @@ -253,6 +257,7 @@ protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, in this.asyncDispatch = asyncDispatch; this.sessionAsyncDispatch = sessionAsyncDispatch; this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); + inCompletionListenerCallback.set(false); setTransactionContext(new TransactionContext(connection)); stats = new JMSSessionStatsImpl(producers, consumers); this.connection.asyncSendPacket(info); @@ -576,12 +581,16 @@ public int getAcknowledgeMode() throws JMSException { @Override public void commit() throws JMSException { checkClosed(); + if (inCompletionListenerCallback.get()) { + throw new IllegalStateRuntimeException("Can't commit transacted session within CompletionListener"); + } if (!getTransacted()) { throw new jakarta.jms.IllegalStateException("Not a transacted session"); } if (LOG.isDebugEnabled()) { LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId()); } + waitForAsyncSendToFinish(); transactionContext.commit(); } @@ -597,12 +606,16 @@ public void commit() throws JMSException { @Override public void rollback() throws JMSException { checkClosed(); + if (inCompletionListenerCallback.get()) { + throw new IllegalStateRuntimeException("Can't rollback transacted session within CompletionListener"); + } if (!getTransacted()) { throw new jakarta.jms.IllegalStateException("Not a transacted session"); } if (LOG.isDebugEnabled()) { LOG.debug(getSessionId() + " Transaction Rollback, txid:" + transactionContext.getTransactionId()); } + waitForAsyncSendToFinish(); transactionContext.rollback(); } @@ -637,6 +650,9 @@ public void rollback() throws JMSException { @Override public void close() throws JMSException { if (!closed) { + if (inCompletionListenerCallback.get()) { + throw new IllegalStateRuntimeException("Can't close session within CompletionListener"); + } if (getTransactionContext().isInXATransaction()) { if (!synchronizationRegistered) { synchronizationRegistered = true; @@ -722,7 +738,8 @@ void deliverAcks() { public synchronized void dispose() throws JMSException { if (!closed) { - + // Wait for Incompleted async send to finish per Jakarta Messaging 3.1 section 3.7.4 + waitForAsyncSendToFinish(); try { executor.close(); @@ -2049,6 +2066,144 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin } } + /** + * Sends the message for dispatch by the broker. + * + * @param producer - message producer. + * @param destination - message destination. + * @param message - message to be sent. + * @param deliveryMode - JMS message delivery mode. + * @param priority - message priority. + * @param timeToLive - message expiration. + * @param disableTimestamp - disable timestamp. + * @param disableMessageID - optionally, disable messageID. + * @param producerWindow + * @param completionListener + * @param producerInCompletionListenerCallback + * @throws JMSException + */ + protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean disableMessageID, boolean disableMessageTimestamp, MemoryUsage producerWindow, int sendTimeout, + CompletionListener completionListener, ThreadLocal producerInCompletionListenerCallback) throws JMSException { + + checkClosed(); + if (destination.isTemporary() && connection.isDeleted(destination)) { + throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); + } + synchronized (sendMutex) { + // tell the Broker we are about to start a new transaction + doStartTransaction(); + if (transactionContext.isRollbackOnly()) { + throw new IllegalStateException("transaction marked rollback only"); + } + TransactionId txid = transactionContext.getTransactionId(); + long sequenceNumber = producer.getMessageSequence(); + + //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 + message.setJMSDeliveryMode(deliveryMode); + long expiration = 0L; + long timeStamp = System.currentTimeMillis(); + if (timeToLive > 0) { + expiration = timeToLive + timeStamp; + } + + if(!(message instanceof ActiveMQMessage)) { + setForeignMessageDeliveryTime(message, timeStamp); + } else { + message.setJMSDeliveryTime(timeStamp); + } + if (!disableMessageTimestamp && !producer.getDisableMessageTimestamp()) { + message.setJMSTimestamp(timeStamp); + } else { + message.setJMSTimestamp(0l); + } + message.setJMSExpiration(expiration); + message.setJMSPriority(priority); + message.setJMSRedelivered(false); + + // transform to our own message format here + ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); + msg.setDestination(destination); + msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); + + // Set the message id. + if (msg != message) { + message.setJMSMessageID(msg.getMessageId().toString()); + // Make sure the JMS destination is set on the foreign messages too. + message.setJMSDestination(destination); + } + //clear the brokerPath in case we are re-sending this message + msg.setBrokerPath(null); + + msg.setTransactionId(txid); + if (connection.isCopyMessageOnSend()) { + msg = (ActiveMQMessage)msg.copy(); + } + msg.setConnection(connection); + msg.onSend(); + msg.setProducerId(msg.getMessageId().getProducerId()); + if (LOG.isTraceEnabled()) { + LOG.trace(getSessionId() + " sending message: " + msg); + } + if (completionListener==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { + this.connection.asyncSendPacket(msg); + if (producerWindow != null) { + // Since we defer lots of the marshaling till we hit the + // wire, this might not + // provide and accurate size. We may change over to doing + // more aggressive marshaling, + // to get more accurate sizes.. this is more important once + // users start using producer window + // flow control. + int size = msg.getSize(); + producerWindow.increaseUsage(size); + } + } else { + if (sendTimeout > 0 && completionListener==null) { + this.connection.syncSendPacket(msg, sendTimeout); + }else { + CompletionListener wrapperCompletionListener = null; + if (completionListener != null) { + // Make the Message object unaccessible and unmutable + // per Jakarta Messaging 3.1 spec section 7.3.9 and 7.3.6 + numIncompletedAsyncSend.doIncrement(); + wrapperCompletionListener = new CompletionListener() { + @Override + public void onCompletion(Message message) { + try { + inCompletionListenerCallback.set(true); + producerInCompletionListenerCallback.set(true); + numIncompletedAsyncSend.doDecrement(); + completionListener.onCompletion(message); + } catch (Exception e) { + // invoke onException if the exception can't be thrown in the thread that calls the send + // per Jakarta Messaging 3.1 spec section 7.3.2 + completionListener.onException(message, e); + } finally { + inCompletionListenerCallback.set(false); + producerInCompletionListenerCallback.set(false); + } + } + + @Override + public void onException(Message message, Exception e) { + try { + inCompletionListenerCallback.set(true); + completionListener.onException(message, e); + } finally { + numIncompletedAsyncSend.doDecrement(); + inCompletionListenerCallback.set(false); + } + } + }; + } + this.connection.syncSendPacket(msg, wrapperCompletionListener); + } + } + + } + } + /** * Send TransactionInfo to indicate transaction has started * @@ -2339,4 +2494,8 @@ private static void setForeignMessageDeliveryTime(final Message foreignMessage, foreignMessage.setJMSDeliveryTime(deliveryTime); } } + + private void waitForAsyncSendToFinish() { + numIncompletedAsyncSend.doWaitForZero(); + } } diff --git a/activemq-client/src/main/java/org/apache/activemq/util/CountdownLock.java b/activemq-client/src/main/java/org/apache/activemq/util/CountdownLock.java new file mode 100644 index 00000000000..f76842aa90e --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/util/CountdownLock.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This concurrent data structure is used when the calling thread wants to wait until a counter gets to 0 but the counter + * can go up and down (unlike a CountDownLatch which can only count down) + */ +public class CountdownLock { + + final Object counterMonitor = new Object(); + private final AtomicInteger counter = new AtomicInteger(); + + public void doWaitForZero() { + synchronized(counterMonitor){ + try { + if (counter.get() > 0) { + counterMonitor.wait(); + } + } catch (InterruptedException e) { + return; + } + } + } + + public void doDecrement() { + synchronized(counterMonitor){ + if (counter.decrementAndGet() == 0) { + counterMonitor.notify(); + } + } + } + + public void doIncrement() { + synchronized(counterMonitor){ + counter.incrementAndGet(); + } + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AsyncSendTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AsyncSendTest.java new file mode 100644 index 00000000000..010555c4c62 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AsyncSendTest.java @@ -0,0 +1,544 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.jms2; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import jakarta.jms.CompletionListener; +import jakarta.jms.Destination; +import jakarta.jms.JMSContext; +import jakarta.jms.JMSException; +import jakarta.jms.JMSProducer; +import jakarta.jms.Message; +import jakarta.jms.MessageConsumer; +import jakarta.jms.Session; +import jakarta.jms.TextMessage; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ActiveMQJMS2AsyncSendTest extends ActiveMQJMS2TestBase{ + + private static final Logger log = LoggerFactory.getLogger(ActiveMQJMS2AsyncSendTest.class); + + @Test + public void testSendMessageWithSessionApi_spec_7_3_1() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#quality-of-service + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + + @Override + public void onCompletion(Message message) { + latch.countDown(); + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + + messageProducer.send( + session.createQueue(methodNameDestinationName), + session.createTextMessage("Test-" + methodNameDestinationName), + completionListener); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener was not triggered within 10 seconds or threw an exception"); + } + } + + @Test + public void testSendMessageWithContextApi_spec_7_3_1() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#quality-of-service + try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + String textBody = "Test-" + methodNameDestinationName; + jmsContext.start(); + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + + @Override + public void onCompletion(Message message) { + latch.countDown(); + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + jmsProducer.setAsync(completionListener); + jmsProducer.send(destination, textBody); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener was not triggered within 10 seconds or threw an exception"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testOnExceptionTriggered_spec_7_3_2() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#exceptions + try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + String textBody = "Test-" + methodNameDestinationName; + jmsContext.start(); + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + + @Override + public void onCompletion(Message message) throws RuntimeException { + throw new RuntimeException("throw runtime exception"); + } + + @Override + public void onException(Message message, Exception e) { + latch.countDown(); + } + }; + + jmsProducer.setAsync(completionListener); + jmsProducer.send(destination, textBody); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener onException method was not triggered within 10 seconds"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testCorrectMessageOrder_spec7_3_3() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-order-2 + try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + ArrayList expectedOrderedMessages = new ArrayList<>(); + ArrayList actualOrderedMessages = new ArrayList<>(); + Object mutex = new Object(); + jmsContext.start(); + int num_msgs = 100; + CountDownLatch latch = new CountDownLatch(num_msgs); + CompletionListener completionListener = new CompletionListener() { + @Override + public void onCompletion(Message message) { + synchronized (mutex) { + try { + String text = ((TextMessage) message).getText(); + actualOrderedMessages.add(text); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + latch.countDown(); + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + + jmsProducer.setAsync(completionListener); + for (int i = 0; i < num_msgs; i++) { + String textBody = "Test-" + methodNameDestinationName + "-" + String.valueOf(i); + expectedOrderedMessages.add(textBody); + jmsProducer.send(destination, textBody); + } + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener was not triggered within 10 seconds or threw an exception"); + } + for (int i = 0; i < num_msgs; i++) { + String got = actualOrderedMessages.get(i); + String expected = expectedOrderedMessages.get(i); + if (!got.equals(expected)) { + fail(String.format("Message out of order. Got %s but expected %s", got, expected)); + } + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testUnableToCloseContextInCompletionListener_spec_7_3_4() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback + try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + String textBody = "Test-" + methodNameDestinationName; + jmsContext.start(); + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + + @Override + public void onCompletion(Message message) { + jmsContext.close(); // This should cause a RuntimeException to throw and trigger the onException + } + + @Override + public void onException(Message message, Exception e) { + latch.countDown(); + } + }; + + jmsProducer.setAsync(completionListener); + jmsProducer.send(destination, textBody); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener onException was not triggered within 10 seconds or threw an exception"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testUnableToCloseProducerInCompletionListener_spec_7_3_4() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + @Override + public void onCompletion(Message message) { + try { + messageProducer.close(); // This should cause a RuntimeException to throw and trigger the onException + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onException(Message message, Exception e) { + latch.countDown(); + } + }; + messageProducer.send(session.createQueue(methodNameDestinationName), + session.createTextMessage("Test-" + methodNameDestinationName), completionListener); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener onException was not triggered within 10 seconds or threw an exception"); + } + } + + @Test + public void testUnableToCommitTransactionInCompletionListener_spec_7_3_4() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback + try(JMSContext jmsContext = activemqConnectionFactory.createContext( + DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.SESSION_TRANSACTED)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + String textBody = "Test-" + methodNameDestinationName; + jmsContext.start(); + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + + @Override + public void onCompletion(Message message) { + jmsContext.commit(); // This should cause a RuntimeException to throw and trigger the onException + } + + @Override + public void onException(Message message, Exception e) { + latch.countDown(); + } + }; + + jmsProducer.setAsync(completionListener); + jmsProducer.send(destination, textBody); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener onException was not triggered within 10 seconds or threw an exception"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testUnableToRollbackTransactionInCompletionListener_spec_7_3_4() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback + try(JMSContext jmsContext = activemqConnectionFactory.createContext( + DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.SESSION_TRANSACTED)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + String textBody = "Test-" + methodNameDestinationName; + jmsContext.start(); + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + + @Override + public void onCompletion(Message message) { + jmsContext.rollback(); // This should cause a RuntimeException to throw and trigger the onException + } + + @Override + public void onException(Message message, Exception e) { + latch.countDown(); + } + }; + + jmsProducer.setAsync(completionListener); + jmsProducer.send(destination, textBody); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener onException was not triggered within 10 seconds or threw an exception"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testCloseContextWailUntilAllIncompleteSentToFinish_spec_7_3_4() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback + try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + ArrayList expectedOrderedMessages = new ArrayList<>(); + ArrayList actualOrderedMessages = new ArrayList<>(); + Object mutex = new Object(); + jmsContext.start(); + int num_msgs = 100; + CompletionListener completionListener = new CompletionListener() { + @Override + public void onCompletion(Message message) { + synchronized (mutex) { + try { + String text = ((TextMessage) message).getText(); + actualOrderedMessages.add(text); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + + jmsProducer.setAsync(completionListener); + for (int i = 0; i < num_msgs; i++) { + String textBody = "Test-" + methodNameDestinationName + "-" + String.valueOf(i); + expectedOrderedMessages.add(textBody); + jmsProducer.send(destination, textBody); + } + jmsContext.close(); + if (expectedOrderedMessages.size() != actualOrderedMessages.size()) { + fail("jmsContext doesn't wait until all inComplete send to finish"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testCommitContextWailUntilAllIncompleteSentToFinish_spec_7_3_4() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback + try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.SESSION_TRANSACTED)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + ArrayList expectedOrderedMessages = new ArrayList<>(); + ArrayList actualOrderedMessages = new ArrayList<>(); + Object mutex = new Object(); + jmsContext.start(); + int num_msgs = 100; + CompletionListener completionListener = new CompletionListener() { + @Override + public void onCompletion(Message message) { + synchronized (mutex) { + try { + String text = ((TextMessage) message).getText(); + actualOrderedMessages.add(text); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + + jmsProducer.setAsync(completionListener); + for (int i = 0; i < num_msgs; i++) { + String textBody = "Test-" + methodNameDestinationName + "-" + String.valueOf(i); + expectedOrderedMessages.add(textBody); + jmsProducer.send(destination, textBody); + } + jmsContext.commit(); + if (expectedOrderedMessages.size() != actualOrderedMessages.size()) { + fail("jmsContext doesn't wait until all inComplete send to finish"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testAbleToAccessMessageHeaderAfterAsyncSendCompleted_spec7_3_6_spec7_3_9() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-headers + // We won't throw exception because it's optional as stated in the spec. + // "If the Jakarta Messaging provider does not throw an exception then the behaviour is undefined." + try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + String textBody = "Test-" + methodNameDestinationName; + jmsContext.start(); + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + + @Override + public void onCompletion(Message message) { + try { + if (!((TextMessage) message).getText().equals(textBody)) { + log.error("messages don't match"); + throw new RuntimeException("messages don't match"); + } + } catch (JMSException e) { + throw new RuntimeException(e); + } + latch.countDown(); + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + jmsProducer.setAsync(completionListener); + TextMessage message = jmsContext.createTextMessage(); + message.setText(textBody); + jmsProducer.send(destination, message); + // Trying to get the message header + int deliveryMode = message.getJMSDeliveryMode(); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener was not triggered within 10 seconds or threw an exception"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testCompletionListenerThreadingRestriction_spec7_3_7() throws Exception { + // (https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#restrictions-on-threading) + // The session can continue to be used by current application thread. Session is used one thread at a time (CompletionListener, Application thread ... etc) + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + @Override + public void onCompletion(Message message) { + try { + // Simulate busy processing of the message for 5 seconds. + Thread.sleep(5 * 1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + latch.countDown(); + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + + messageProducer.send( + session.createQueue(methodNameDestinationName), + session.createTextMessage("Test-" + methodNameDestinationName), + completionListener); + MessageConsumer consumer = session.createConsumer(session.createQueue(methodNameDestinationName)); + Message msg = consumer.receive(2 * 1000); + if (msg == null) { + fail("session in the original thread of control was dedicated to the thread of control of CompletionListener"); + } + String gotTextBody = ((TextMessage) msg).getText(); + if (!gotTextBody.equals("Test-" + methodNameDestinationName)) { + fail("receive message is different than the one originally sent"); + } + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener was not triggered within 10 seconds or threw an exception"); + } + } + + @Test + public void testCompletionListenerInvokedInDifferentThread_spec7_3_8() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#use-of-the-completionlistener-by-the-jakarta-messaging-provider + // The CompletionListener has to be invoked in different thread + try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + String textBody = "Test-" + methodNameDestinationName; + jmsContext.start(); + String testThreadName = Thread.currentThread().getName(); + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + + @Override + public void onCompletion(Message message) { + String onCompletionThreadName = Thread.currentThread().getName(); + if (!onCompletionThreadName.equals(testThreadName)) { + latch.countDown(); + } else { + log.error("onCompletion is executed in the same thread as the application thread."); + } + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + + jmsProducer.setAsync(completionListener); + jmsProducer.send(destination, textBody); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener was not triggered within 10 seconds or threw an exception"); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java index cf1e0fa594c..68b153d9ddd 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java @@ -23,6 +23,8 @@ import static org.junit.Assert.fail; import java.util.Enumeration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import jakarta.jms.CompletionListener; import jakarta.jms.Destination; @@ -296,26 +298,6 @@ public void testProducerDeliveryDelaySet() throws JMSException { messageProducer.setDeliveryDelay(1000l); } - @Test(expected = UnsupportedOperationException.class) - public void testProducerSendMessageCompletionListener() throws JMSException { - messageProducer.send(session.createQueue(methodNameDestinationName), null, (CompletionListener)null); - } - - @Test(expected = UnsupportedOperationException.class) - public void testProducerSendMessageQoSParamsCompletionListener() throws JMSException { - messageProducer.send(null, 1, 4, 0l, null); - } - - @Test(expected = UnsupportedOperationException.class) - public void testProducerSendDestinationMessageCompletionListener() throws JMSException { - messageProducer.send(session.createQueue(methodNameDestinationName), null, null); - } - - @Test(expected = UnsupportedOperationException.class) - public void testProducerSendDestinationMessageQosParamsCompletionListener() throws JMSException { - messageProducer.send(session.createQueue(methodNameDestinationName), null, 1, 4, 0l, null); - } - protected static void sendMessage(JMSContext jmsContext, Destination testDestination, String textBody) { assertNotNull(jmsContext); JMSProducer jmsProducer = jmsContext.createProducer(); From a5d8ef1fe6f4869bbe318fedf154c44f61d55039 Mon Sep 17 00:00:00 2001 From: Ken Liao Date: Sat, 14 Dec 2024 15:55:00 -0800 Subject: [PATCH 2/5] Implement 7.3.6 and 7.3.9 - restriction on Message object --- .../activemq/broker/jmx/OpenTypeSupport.java | 25 +- .../org/apache/activemq/ActiveMQSession.java | 10 +- .../activemq/command/ActiveMQMessage.java | 103 +++++-- .../org/apache/activemq/command/Message.java | 2 +- .../transport/mqtt/MQTTPacketIdGenerator.java | 5 +- .../transport/stomp/FrameTranslator.java | 2 +- .../org/apache/activemq/bugs/AMQ4893Test.java | 2 +- .../activemq/command/ActiveMQMessageTest.java | 266 +++++++++++++++++- .../jms2/ActiveMQJMS2AsyncSendTest.java | 26 +- 9 files changed, 380 insertions(+), 61 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java index cf0dc4d9178..a6f1232eb22 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java @@ -159,16 +159,21 @@ protected void init() throws OpenDataException { public Map getFields(Object o) throws OpenDataException { ActiveMQMessage m = (ActiveMQMessage)o; Map rc = super.getFields(o); - rc.put("JMSCorrelationID", m.getJMSCorrelationID()); - rc.put("JMSDestination", "" + m.getJMSDestination()); - rc.put("JMSMessageID", m.getJMSMessageID()); - rc.put("JMSReplyTo",toString(m.getJMSReplyTo())); - rc.put("JMSType", m.getJMSType()); - rc.put("JMSDeliveryMode", m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON-PERSISTENT"); - rc.put("JMSExpiration", m.getJMSExpiration()); - rc.put("JMSPriority", m.getJMSPriority()); - rc.put("JMSRedelivered", m.getJMSRedelivered()); - rc.put("JMSTimestamp", new Date(m.getJMSTimestamp())); + try { + rc.put("JMSCorrelationID", m.getJMSCorrelationID()); + rc.put("JMSDestination", "" + m.getJMSDestination()); + rc.put("JMSMessageID", m.getJMSMessageID()); + rc.put("JMSReplyTo",toString(m.getJMSReplyTo())); + rc.put("JMSType", m.getJMSType()); + rc.put("JMSDeliveryMode", m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON-PERSISTENT"); + rc.put("JMSExpiration", m.getJMSExpiration()); + rc.put("JMSPriority", m.getJMSPriority()); + rc.put("JMSRedelivered", m.getJMSRedelivered()); + rc.put("JMSTimestamp", new Date(m.getJMSTimestamp())); + } catch (JMSException e) { + throw new OpenDataException(e.getMessage()); + } + rc.put(CompositeDataConstants.JMSXGROUP_ID, m.getGroupID()); rc.put(CompositeDataConstants.JMSXGROUP_SEQ, m.getGroupSequence()); rc.put(CompositeDataConstants.JMSXUSER_ID, m.getUserID()); diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index c0a2a2ac3a9..24cbe37c8ee 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -2136,7 +2136,12 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin msg.setBrokerPath(null); msg.setTransactionId(txid); - if (connection.isCopyMessageOnSend()) { + final ActiveMQMessage originalMessage = msg; + if (connection.isCopyMessageOnSend() || completionListener != null) { + // We need to make the message inaccessible per Jakarta Messaging 3.1 - 7.3.6 & 7.3.9 + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#restrictions-on-the-use-of-the-message-object + // To do that, we need to set a flag in the message referenced in sender thread. To avoid making + // the message inaccessible once received on the server side (even tho the WireFormat marshaller doesn't marshal that field, so it shouldn't matter) msg = (ActiveMQMessage)msg.copy(); } msg.setConnection(connection); @@ -2167,10 +2172,12 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin // Make the Message object unaccessible and unmutable // per Jakarta Messaging 3.1 spec section 7.3.9 and 7.3.6 numIncompletedAsyncSend.doIncrement(); + originalMessage.setMessageAccessible(false); wrapperCompletionListener = new CompletionListener() { @Override public void onCompletion(Message message) { try { + originalMessage.setMessageAccessible(true); inCompletionListenerCallback.set(true); producerInCompletionListenerCallback.set(true); numIncompletedAsyncSend.doDecrement(); @@ -2188,6 +2195,7 @@ public void onCompletion(Message message) { @Override public void onException(Message message, Exception e) { try { + originalMessage.setMessageAccessible(true); inCompletionListenerCallback.set(true); completionListener.onException(message, e); } finally { diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java index 2b9c86dc17b..67e6973fb60 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java @@ -27,6 +27,7 @@ import jakarta.jms.DeliveryMode; import jakarta.jms.Destination; import jakarta.jms.JMSException; +import jakarta.jms.JMSRuntimeException; import jakarta.jms.MessageFormatException; import jakarta.jms.MessageNotWriteableException; @@ -53,6 +54,8 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess protected transient Callback acknowledgeCallback; + private volatile boolean messageAccessible = true; + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; @@ -110,12 +113,14 @@ public void acknowledge() throws JMSException { @Override public void clearBody() throws JMSException { + checkMessageAccessible(); setContent(null); readOnlyBody = false; } @Override - public String getJMSMessageID() { + public String getJMSMessageID() throws JMSException { + checkMessageAccessible(); MessageId messageId = this.getMessageId(); if (messageId == null) { return null; @@ -132,6 +137,7 @@ public String getJMSMessageID() { */ @Override public void setJMSMessageID(String value) throws JMSException { + checkMessageAccessible(); if (value != null) { try { MessageId id = new MessageId(value); @@ -158,6 +164,7 @@ public void setJMSMessageID(String value) throws JMSException { * @throws JMSException */ public void setJMSMessageID(ProducerId producerId, long producerSequenceId) throws JMSException { + checkMessageAccessible(); MessageId id = null; try { id = new MessageId(producerId, producerSequenceId); @@ -168,32 +175,38 @@ public void setJMSMessageID(ProducerId producerId, long producerSequenceId) thro } @Override - public long getJMSTimestamp() { + public long getJMSTimestamp() throws JMSException { + checkMessageAccessible(); return this.getTimestamp(); } @Override - public void setJMSTimestamp(long timestamp) { + public void setJMSTimestamp(long timestamp) throws JMSException { + checkMessageAccessible(); this.setTimestamp(timestamp); } @Override - public String getJMSCorrelationID() { + public String getJMSCorrelationID() throws JMSException { + checkMessageAccessible(); return this.getCorrelationId(); } @Override - public void setJMSCorrelationID(String correlationId) { + public void setJMSCorrelationID(String correlationId) throws JMSException { + checkMessageAccessible(); this.setCorrelationId(correlationId); } @Override public byte[] getJMSCorrelationIDAsBytes() throws JMSException { + checkMessageAccessible(); return encodeString(this.getCorrelationId()); } @Override public void setJMSCorrelationIDAsBytes(byte[] correlationId) throws JMSException { + checkMessageAccessible(); this.setCorrelationId(decodeString(correlationId)); } @@ -225,77 +238,92 @@ protected static byte[] encodeString(String data) throws JMSException { } @Override - public Destination getJMSReplyTo() { + public Destination getJMSReplyTo()throws JMSException { + checkMessageAccessible(); return this.getReplyTo(); } @Override public void setJMSReplyTo(Destination destination) throws JMSException { + checkMessageAccessible(); this.setReplyTo(ActiveMQDestination.transform(destination)); } @Override - public Destination getJMSDestination() { + public Destination getJMSDestination() throws JMSException { + checkMessageAccessible(); return this.getDestination(); } @Override public void setJMSDestination(Destination destination) throws JMSException { + checkMessageAccessible(); this.setDestination(ActiveMQDestination.transform(destination)); } @Override - public int getJMSDeliveryMode() { + public int getJMSDeliveryMode() throws JMSException { + checkMessageAccessible(); return this.isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; } @Override - public void setJMSDeliveryMode(int mode) { + public void setJMSDeliveryMode(int mode) throws JMSException { + checkMessageAccessible(); this.setPersistent(mode == DeliveryMode.PERSISTENT); } @Override - public boolean getJMSRedelivered() { + public boolean getJMSRedelivered() throws JMSException { + checkMessageAccessible(); return this.isRedelivered(); } @Override - public void setJMSRedelivered(boolean redelivered) { + public void setJMSRedelivered(boolean redelivered) throws JMSException { + checkMessageAccessible(); this.setRedelivered(redelivered); } @Override - public String getJMSType() { + public String getJMSType() throws JMSException { + checkMessageAccessible(); return this.getType(); } @Override - public void setJMSType(String type) { + public void setJMSType(String type) throws JMSException { + checkMessageAccessible(); this.setType(type); } @Override - public long getJMSExpiration() { + public long getJMSExpiration() throws JMSException { + checkMessageAccessible(); return this.getExpiration(); } @Override - public void setJMSExpiration(long expiration) { + public void setJMSExpiration(long expiration) throws JMSException { + checkMessageAccessible(); this.setExpiration(expiration); } @Override - public int getJMSPriority() { + public int getJMSPriority() throws JMSException { + checkMessageAccessible(); return this.getPriority(); } @Override - public void setJMSPriority(int priority) { + public void setJMSPriority(int priority) throws JMSException { + checkMessageAccessible(); this.setPriority((byte) priority); } @Override - public void clearProperties() { + public void clearProperties() throws JMSException { + checkMessageAccessible(); super.clearProperties(); readOnlyProperties = false; } @@ -349,7 +377,7 @@ public Enumeration getAllPropertyNames() throws JMSException { } interface PropertySetter { - void set(Message message, Object value) throws MessageFormatException; + void set(Message message, Object value) throws MessageFormatException, JMSException; } static { @@ -385,7 +413,7 @@ public void set(Message message, Object value) throws MessageFormatException { }); JMS_PROPERTY_SETERS.put("JMSCorrelationID", new PropertySetter() { @Override - public void set(Message message, Object value) throws MessageFormatException { + public void set(Message message, Object value) throws MessageFormatException, JMSException { String rc = (String) TypeConversionSupport.convert(value, String.class); if (rc == null) { throw new MessageFormatException("Property JMSCorrelationID cannot be set from a " + value.getClass().getName() + "."); @@ -395,7 +423,7 @@ public void set(Message message, Object value) throws MessageFormatException { }); JMS_PROPERTY_SETERS.put("JMSDeliveryMode", new PropertySetter() { @Override - public void set(Message message, Object value) throws MessageFormatException { + public void set(Message message, Object value) throws MessageFormatException, JMSException { Integer rc = null; try { rc = (Integer) TypeConversionSupport.convert(value, Integer.class); @@ -423,7 +451,7 @@ public void set(Message message, Object value) throws MessageFormatException { }); JMS_PROPERTY_SETERS.put("JMSExpiration", new PropertySetter() { @Override - public void set(Message message, Object value) throws MessageFormatException { + public void set(Message message, Object value) throws MessageFormatException, JMSException { Long rc = (Long) TypeConversionSupport.convert(value, Long.class); if (rc == null) { throw new MessageFormatException("Property JMSExpiration cannot be set from a " + value.getClass().getName() + "."); @@ -433,7 +461,7 @@ public void set(Message message, Object value) throws MessageFormatException { }); JMS_PROPERTY_SETERS.put("JMSPriority", new PropertySetter() { @Override - public void set(Message message, Object value) throws MessageFormatException { + public void set(Message message, Object value) throws MessageFormatException, JMSException { Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class); if (rc == null) { throw new MessageFormatException("Property JMSPriority cannot be set from a " + value.getClass().getName() + "."); @@ -443,7 +471,7 @@ public void set(Message message, Object value) throws MessageFormatException { }); JMS_PROPERTY_SETERS.put("JMSRedelivered", new PropertySetter() { @Override - public void set(Message message, Object value) throws MessageFormatException { + public void set(Message message, Object value) throws MessageFormatException , JMSException{ Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class); if (rc == null) { throw new MessageFormatException("Property JMSRedelivered cannot be set from a " + value.getClass().getName() + "."); @@ -463,7 +491,7 @@ public void set(Message message, Object value) throws MessageFormatException { }); JMS_PROPERTY_SETERS.put("JMSTimestamp", new PropertySetter() { @Override - public void set(Message message, Object value) throws MessageFormatException { + public void set(Message message, Object value) throws MessageFormatException, JMSException { Long rc = (Long) TypeConversionSupport.convert(value, Long.class); if (rc == null) { throw new MessageFormatException("Property JMSTimestamp cannot be set from a " + value.getClass().getName() + "."); @@ -473,7 +501,7 @@ public void set(Message message, Object value) throws MessageFormatException { }); JMS_PROPERTY_SETERS.put("JMSType", new PropertySetter() { @Override - public void set(Message message, Object value) throws MessageFormatException { + public void set(Message message, Object value) throws MessageFormatException, JMSException { String rc = (String) TypeConversionSupport.convert(value, String.class); if (rc == null) { throw new MessageFormatException("Property JMSType cannot be set from a " + value.getClass().getName() + "."); @@ -485,11 +513,12 @@ public void set(Message message, Object value) throws MessageFormatException { @Override public void setObjectProperty(String name, Object value) throws JMSException { + checkMessageAccessible(); setObjectProperty(name, value, true); } public void setObjectProperty(String name, Object value, boolean checkReadOnly) throws JMSException { - + checkMessageAccessible(); if (checkReadOnly) { checkReadOnlyProperties(); } @@ -571,10 +600,10 @@ else if (AMQ_SCHEDULED_CRON.equals(name)) { @Override public Object getObjectProperty(String name) throws JMSException { + checkMessageAccessible(); if (name == null) { throw new NullPointerException("Property name cannot be null"); } - // PropertyExpression handles converting message headers to properties. PropertyExpression expression = new PropertyExpression(name); return expression.evaluate(this); @@ -787,16 +816,19 @@ protected boolean isContentMarshalled() { @Override public long getJMSDeliveryTime() throws JMSException { + checkMessageAccessible(); return deliveryTime; } @Override public void setJMSDeliveryTime(long deliveryTime) throws JMSException { + checkMessageAccessible(); this.deliveryTime = deliveryTime; } @Override public final T getBody(Class asType) throws JMSException { + checkMessageAccessible(); if (isBodyAssignableTo(asType)) { return doGetBody(asType); } @@ -813,4 +845,19 @@ protected T doGetBody(Class asType) throws JMSException { return null; } + public void setMessageAccessible(boolean messageAccessible) { + this.messageAccessible = messageAccessible; + } + + private boolean isMessageAccessible() { + return messageAccessible; + } + + private void checkMessageAccessible() throws JMSException { + if (!messageAccessible) { + throw new JMSException( + "Can not access and mutate message, the message is sent asynchronously and its completion listener has not been invoked"); + } + } + } diff --git a/activemq-client/src/main/java/org/apache/activemq/command/Message.java b/activemq-client/src/main/java/org/apache/activemq/command/Message.java index 2a31047c9eb..9a372784828 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/Message.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/Message.java @@ -215,7 +215,7 @@ public Map getProperties() throws IOException { return Collections.unmodifiableMap(properties); } - public void clearProperties() { + public void clearProperties() throws JMSException { marshalledProperties = null; properties = null; } diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTPacketIdGenerator.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTPacketIdGenerator.java index bf57f1c9f43..771ae19ba06 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTPacketIdGenerator.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTPacketIdGenerator.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import jakarta.jms.JMSException; import org.apache.activemq.Service; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQMessage; @@ -67,7 +68,7 @@ public boolean stopClientSession(String clientId) { return clientIdMap.remove(clientId) != null; } - public short setPacketId(String clientId, MQTTSubscription subscription, ActiveMQMessage message, PUBLISH publish) { + public short setPacketId(String clientId, MQTTSubscription subscription, ActiveMQMessage message, PUBLISH publish) throws JMSException { final PacketIdMaps idMaps = clientIdMap.get(clientId); if (idMaps == null) { // maybe its a cleansession=true client id, use session less message id @@ -125,7 +126,7 @@ private class PacketIdMaps { final Map activemqToPacketIds = new LRUCache(MQTTProtocolConverter.DEFAULT_CACHE_SIZE); final Map packetIdsToActivemq = new LRUCache(MQTTProtocolConverter.DEFAULT_CACHE_SIZE); - short setPacketId(MQTTSubscription subscription, ActiveMQMessage message, PUBLISH publish) { + short setPacketId(MQTTSubscription subscription, ActiveMQMessage message, PUBLISH publish) throws JMSException { // subscription key final StringBuilder subscriptionKey = new StringBuilder(); subscriptionKey.append(subscription.getConsumerInfo().getDestination().getPhysicalName()) diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java index ed858e05560..20215577020 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java @@ -51,7 +51,7 @@ static final class Helper { private Helper() { } - public static void copyStandardHeadersFromMessageToFrame(ProtocolConverter converter, ActiveMQMessage message, StompFrame command, FrameTranslator ft) throws IOException { + public static void copyStandardHeadersFromMessageToFrame(ProtocolConverter converter, ActiveMQMessage message, StompFrame command, FrameTranslator ft) throws IOException, JMSException { final Map headers = command.getHeaders(); headers.put(Stomp.Headers.Message.DESTINATION, ft.convertDestination(converter, message.getDestination())); headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java index c1f0707b842..cd306299b20 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java @@ -72,7 +72,7 @@ private void roundTripProperties(ActiveMQObjectMessage message) throws IOExcepti } } - private void fakeUnmarshal(ActiveMQObjectMessage message) throws IOException { + private void fakeUnmarshal(ActiveMQObjectMessage message) throws IOException, JMSException { // we need to force the unmarshalled property field to be set so it // gives us a hawtbuffer for the string OpenWireFormat format = new OpenWireFormat(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java index badd57a8df1..0bab291c2b1 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java @@ -193,7 +193,7 @@ public void testGetAndSetJMSMessageID() throws Exception { assertEquals(msg.getJMSMessageID(), this.jmsMessageID); } - public void testGetAndSetJMSTimestamp() { + public void testGetAndSetJMSTimestamp() throws JMSException { ActiveMQMessage msg = new ActiveMQMessage(); msg.setJMSTimestamp(this.jmsTimestamp); assertTrue(msg.getJMSTimestamp() == this.jmsTimestamp); @@ -216,7 +216,7 @@ public void testSetJMSCorrelationIDAsBytes() throws Exception { assertTrue(this.jmsCorrelationID.equals(str2)); } - public void testGetAndSetJMSCorrelationID() { + public void testGetAndSetJMSCorrelationID() throws JMSException { ActiveMQMessage msg = new ActiveMQMessage(); msg.setJMSCorrelationID(this.jmsCorrelationID); assertTrue(msg.getJMSCorrelationID().equals(this.jmsCorrelationID)); @@ -234,31 +234,31 @@ public void testGetAndSetJMSDestination() throws Exception { assertTrue(msg.getJMSDestination().equals(this.jmsDestination)); } - public void testGetAndSetJMSDeliveryMode() { + public void testGetAndSetJMSDeliveryMode() throws JMSException { ActiveMQMessage msg = new ActiveMQMessage(); msg.setJMSDeliveryMode(this.jmsDeliveryMode); assertTrue(msg.getJMSDeliveryMode() == this.jmsDeliveryMode); } - public void testGetAndSetMSRedelivered() { + public void testGetAndSetMSRedelivered()throws JMSException { ActiveMQMessage msg = new ActiveMQMessage(); msg.setJMSRedelivered(this.jmsRedelivered); assertTrue(msg.getJMSRedelivered() == this.jmsRedelivered); } - public void testGetAndSetJMSType() { + public void testGetAndSetJMSType()throws JMSException { ActiveMQMessage msg = new ActiveMQMessage(); msg.setJMSType(this.jmsType); assertTrue(msg.getJMSType().equals(this.jmsType)); } - public void testGetAndSetJMSExpiration() { + public void testGetAndSetJMSExpiration()throws JMSException { ActiveMQMessage msg = new ActiveMQMessage(); msg.setJMSExpiration(this.jmsExpiration); assertTrue(msg.getJMSExpiration() == this.jmsExpiration); } - public void testGetAndSetJMSPriority() { + public void testGetAndSetJMSPriority() throws JMSException { ActiveMQMessage msg = new ActiveMQMessage(); msg.setJMSPriority(this.jmsPriority); assertTrue(msg.getJMSPriority() == this.jmsPriority); @@ -982,11 +982,261 @@ public void testReadOnlyProperties() throws JMSException { } } - public void testIsExpired() { + public void testIsExpired() throws JMSException { ActiveMQMessage msg = new ActiveMQMessage(); msg.setJMSExpiration(System.currentTimeMillis() - 1); assertTrue(msg.isExpired()); msg.setJMSExpiration(System.currentTimeMillis() + 10000); assertFalse(msg.isExpired()); } + + public void testMessageGetterInaccessible() throws JMSException { + ActiveMQMessage msg = new ActiveMQMessage(); + msg.setMessageAccessible(false); + try { + msg.getJMSMessageID(); + fail("getJMSMessageID Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getJMSTimestamp(); + fail("getJMSTimestamp Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getJMSCorrelationIDAsBytes(); + fail("getJMSCorrelationIDAsBytes Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getJMSCorrelationID(); + fail("getJMSCorrelationID Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getJMSReplyTo(); + fail("getJMSReplyTo Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getJMSDestination(); + fail("getJMSDestination Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getJMSDeliveryMode(); + fail("getJMSDeliveryMode Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getJMSRedelivered(); + fail("getJMSRedelivered Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getJMSType(); + fail("getJMSType Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getJMSExpiration(); + fail("getJMSExpiration Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getJMSPriority(); + fail("getJMSPriority Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getBody(String.class); + fail("getBody Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getBooleanProperty(""); + fail("getBooleanProperty Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getShortProperty(""); + fail("getShortProperty Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getIntProperty(""); + fail("getIntProperty Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getLongProperty(""); + fail("getLongProperty Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getFloatProperty(""); + fail("getFloatProperty Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getDoubleProperty(""); + fail("getDoubleProperty Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getStringProperty(""); + fail("getStringProperty Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.getObjectProperty(""); + fail("getObjectProperty Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + } + + public void testMessageSetterInaccessible() throws JMSException { + ActiveMQMessage msg = new ActiveMQMessage(); + msg.setMessageAccessible(false); + try { + msg.setJMSMessageID(""); + fail("getJMSMessageID Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setJMSTimestamp(0L); + fail("setJMSTimestamp Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setJMSCorrelationIDAsBytes(new byte[]{}); + fail("setJMSCorrelationIDAsBytes Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setJMSCorrelationID(""); + fail("setJMSCorrelationID Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setJMSReplyTo(null); + fail("setJMSReplyTo Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setJMSDestination(null); + fail("setJMSDestination Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setJMSDeliveryMode(0); + fail("setJMSDeliveryMode Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setJMSRedelivered(false); + fail("setJMSRedelivered Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setJMSType(""); + fail("setJMSType Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setJMSExpiration(0L); + fail("setJMSExpiration Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setJMSPriority(0); + fail("setJMSPriority Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setBooleanProperty("proporty", false); + fail("setBooleanProperty Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setShortProperty("proporty", (short)0); + fail("setShortProperty Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setIntProperty("proporty", 0); + fail("setIntProperty Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setLongProperty("proporty", 0L); + fail("setLongProperty Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setFloatProperty("proporty", 0.0f); + fail("setFloatProperty Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setDoubleProperty("proporty", 0.0); + fail("setDoubleProperty Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setStringProperty("proporty", ""); + fail("setStringProperty Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + try { + msg.setObjectProperty("proporty", null); + fail("setObjectProperty Should have thrown exception"); + } catch (JMSException e) { + checkIfMessageInaccessibleJMSExcaptionMessageMatch(e); + } + } + + private void checkIfMessageInaccessibleJMSExcaptionMessageMatch(JMSException e) throws JMSException { + if (!e.getMessage().equals("Can not access and mutate message, the message is sent asynchronously and its completion listener has not been invoked")) { + throw new JMSException("JMSException error don't match"); + } + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AsyncSendTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AsyncSendTest.java index 010555c4c62..4475b35b66a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AsyncSendTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AsyncSendTest.java @@ -28,6 +28,7 @@ import jakarta.jms.JMSContext; import jakarta.jms.JMSException; import jakarta.jms.JMSProducer; +import jakarta.jms.JMSRuntimeException; import jakarta.jms.Message; import jakarta.jms.MessageConsumer; import jakarta.jms.Session; @@ -413,10 +414,8 @@ public void onException(Message message, Exception e) { } @Test - public void testAbleToAccessMessageHeaderAfterAsyncSendCompleted_spec7_3_6_spec7_3_9() throws Exception { + public void testUnAbleToAccessMessageHeaderAfterAsyncSendCompleted_spec7_3_6_spec7_3_9() throws Exception { // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-headers - // We won't throw exception because it's optional as stated in the spec. - // "If the Jakarta Messaging provider does not throw an exception then the behaviour is undefined." try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) { assertNotNull(jmsContext); JMSProducer jmsProducer = jmsContext.createProducer(); @@ -429,10 +428,8 @@ public void testAbleToAccessMessageHeaderAfterAsyncSendCompleted_spec7_3_6_spec7 @Override public void onCompletion(Message message) { try { - if (!((TextMessage) message).getText().equals(textBody)) { - log.error("messages don't match"); - throw new RuntimeException("messages don't match"); - } + // Running the getter within the CompletionListener shouldn't trigger exception + message.getJMSDeliveryMode(); } catch (JMSException e) { throw new RuntimeException(e); } @@ -448,12 +445,23 @@ public void onException(Message message, Exception e) { TextMessage message = jmsContext.createTextMessage(); message.setText(textBody); jmsProducer.send(destination, message); - // Trying to get the message header - int deliveryMode = message.getJMSDeliveryMode(); + // Test one messages getter, the rest are tested in ActiveMQMessage unit test + try { + message.getJMSDeliveryMode(); + fail("getJMSDeliveryMode didn't throw exception when accessed before CompletionListener is invoked"); + } catch (JMSException e) { + if (!e.getMessage().equals("Can not access and mutate message, the message is sent asynchronously and its completion listener has not been invoked")) { + throw e; + } + log.info("got expected exception for getJMSDeliveryMode: {}", e.getMessage()); + } + boolean status = latch.await(10L, TimeUnit.SECONDS); if (!status) { fail("the completion listener was not triggered within 10 seconds or threw an exception"); } + // Now the message should be unlocked and accessible + message.getJMSDeliveryMode(); } catch (Exception e) { fail(e.getMessage()); } From 603fd7226717cd918d1b3575b41a040899ef7550 Mon Sep 17 00:00:00 2001 From: Ken Liao Date: Sat, 14 Dec 2024 23:34:25 -0800 Subject: [PATCH 3/5] Null check the ThreadLocal markers --- .../main/java/org/apache/activemq/ActiveMQMessageProducer.java | 2 +- .../src/main/java/org/apache/activemq/ActiveMQSession.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java index 6b352c7705b..7f3aef97834 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java @@ -170,7 +170,7 @@ public Destination getDestination() throws JMSException { */ @Override public void close() throws JMSException { - if (inCompletionListenerCallback.get()) { + if (inCompletionListenerCallback != null && inCompletionListenerCallback.get()) { throw new IllegalStateRuntimeException("Can't close message producer within CompletionListener"); } if (!closed) { diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 24cbe37c8ee..3439f23bd77 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -606,7 +606,7 @@ public void commit() throws JMSException { @Override public void rollback() throws JMSException { checkClosed(); - if (inCompletionListenerCallback.get()) { + if (inCompletionListenerCallback.get() != null && inCompletionListenerCallback.get()) { throw new IllegalStateRuntimeException("Can't rollback transacted session within CompletionListener"); } if (!getTransacted()) { From 1e77c7140373759bd0f6f49295a71e5b8d419dc3 Mon Sep 17 00:00:00 2001 From: Ken Liao Date: Sun, 15 Dec 2024 14:56:37 -0800 Subject: [PATCH 4/5] Implement 7.3.8, send Async Send one at a time. Also make sure producer close is handled correctly according to 7.3.4 --- .../activemq/ActiveMQMessageProducer.java | 10 +- .../org/apache/activemq/ActiveMQSession.java | 99 +++++++++----- .../jms2/ActiveMQJMS2AsyncSendTest.java | 121 +++++++++++++++++- 3 files changed, 193 insertions(+), 37 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java index 7f3aef97834..6ece8907007 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java @@ -35,6 +35,7 @@ import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsImpl; import org.apache.activemq.usage.MemoryUsage; +import org.apache.activemq.util.CountdownLock; import org.apache.activemq.util.IntrospectionSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,6 +85,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl private MessageTransformer transformer; private MemoryUsage producerWindow; private final ThreadLocal inCompletionListenerCallback = new ThreadLocal<>(); + private final CountdownLock numIncompleteAsyncSend = new CountdownLock(); protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException { super(session); @@ -173,6 +175,7 @@ public void close() throws JMSException { if (inCompletionListenerCallback != null && inCompletionListenerCallback.get()) { throw new IllegalStateRuntimeException("Can't close message producer within CompletionListener"); } + waitForAsyncSendToFinish(); if (!closed) { dispose(); this.session.asyncSendPacket(info.createRemoveCommand()); @@ -232,7 +235,7 @@ public void send(Destination destination, Message message, int deliveryMode, int /** * * @param message the message to send - * @param CompletionListener to callback + * @param completionListener to callback * @throws JMSException if the JMS provider fails to send the message due to * some internal error. * @throws UnsupportedOperationException if an invalid destination is @@ -319,7 +322,7 @@ public void send(Destination destination, Message message, int deliveryMode, int } this.session.send(this, dest, message, deliveryMode, priority, timeToLive, disableMessageID, - disableMessageTimestamp, producerWindow, sendTimeout, completionListener, inCompletionListenerCallback); + disableMessageTimestamp, producerWindow, sendTimeout, completionListener, inCompletionListenerCallback, numIncompleteAsyncSend); stats.onMessage(); } @@ -455,4 +458,7 @@ public void onProducerAck(ProducerAck pa) { } } + private void waitForAsyncSendToFinish() { + numIncompleteAsyncSend.doWaitForZero(); + } } diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 3439f23bd77..edb880d2d5a 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -26,12 +26,16 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import jakarta.jms.BytesMessage; import jakarta.jms.CompletionListener; +import jakarta.jms.Connection; import jakarta.jms.Destination; import jakarta.jms.IllegalStateException; import jakarta.jms.IllegalStateRuntimeException; @@ -236,8 +240,9 @@ public static interface DeliveryListener { private MessageTransformer transformer; private BlobTransferPolicy blobTransferPolicy; private long lastDeliveredSequenceId = -2; - private CountdownLock numIncompletedAsyncSend = new CountdownLock(); + private CountdownLock numIncompleteAsyncSend = new CountdownLock(); private ThreadLocal inCompletionListenerCallback = new ThreadLocal<>(); + private ExecutorService asyncSendExecutor; /** * Construct the Session @@ -265,6 +270,7 @@ protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, in setBlobTransferPolicy(connection.getBlobTransferPolicy()); this.connectionExecutor=connection.getExecutor(); this.executor = new ActiveMQSessionExecutor(this); + asyncSendExecutor = Executors.newSingleThreadExecutor(); connection.addSession(this); if (connection.isStarted()) { start(); @@ -2080,11 +2086,12 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin * @param producerWindow * @param completionListener * @param producerInCompletionListenerCallback + * @param producerNumIncompleteSend * @throws JMSException */ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, - boolean disableMessageID, boolean disableMessageTimestamp, MemoryUsage producerWindow, int sendTimeout, - CompletionListener completionListener, ThreadLocal producerInCompletionListenerCallback) throws JMSException { + boolean disableMessageID, boolean disableMessageTimestamp, MemoryUsage producerWindow, int sendTimeout, CompletionListener completionListener, + ThreadLocal producerInCompletionListenerCallback, CountdownLock producerNumIncompleteSend) throws JMSException { checkClosed(); if (destination.isTemporary() && connection.isDeleted(destination)) { @@ -2167,45 +2174,71 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin if (sendTimeout > 0 && completionListener==null) { this.connection.syncSendPacket(msg, sendTimeout); }else { - CompletionListener wrapperCompletionListener = null; if (completionListener != null) { + final ActiveMQMessage finalMsgRef = msg; // Make the Message object unaccessible and unmutable // per Jakarta Messaging 3.1 spec section 7.3.9 and 7.3.6 - numIncompletedAsyncSend.doIncrement(); + numIncompleteAsyncSend.doIncrement(); + producerNumIncompleteSend.doIncrement(); originalMessage.setMessageAccessible(false); - wrapperCompletionListener = new CompletionListener() { + // Submit to a another async thread for sending the message asynchronously + // The reason is we want to execute async send and invoke its CompletionListner one at a time + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#use-of-the-completionlistener-by-the-jakarta-messaging-provider + asyncSendExecutor.submit(new Runnable() { @Override - public void onCompletion(Message message) { - try { - originalMessage.setMessageAccessible(true); - inCompletionListenerCallback.set(true); - producerInCompletionListenerCallback.set(true); - numIncompletedAsyncSend.doDecrement(); - completionListener.onCompletion(message); - } catch (Exception e) { - // invoke onException if the exception can't be thrown in the thread that calls the send - // per Jakarta Messaging 3.1 spec section 7.3.2 - completionListener.onException(message, e); - } finally { - inCompletionListenerCallback.set(false); - producerInCompletionListenerCallback.set(false); - } - } + public void run() { + CountDownLatch isAsyncSendCompleted = new CountDownLatch(1); + CompletionListener wrapperCompletionListener = new CompletionListener() { + @Override + public void onCompletion(Message message) { + try { + originalMessage.setMessageAccessible(true); + inCompletionListenerCallback.set(true); + producerInCompletionListenerCallback.set(true); + // Invoke application provided completionListener + completionListener.onCompletion(message); + } catch (Exception e) { + // invoke onException if the exception can't be thrown in the thread that calls the send + // per Jakarta Messaging 3.1 spec section 7.3.2 + completionListener.onException(message, e); + } finally { + inCompletionListenerCallback.set(false); + producerInCompletionListenerCallback.set(false); + numIncompleteAsyncSend.doDecrement(); + producerNumIncompleteSend.doDecrement(); + isAsyncSendCompleted.countDown(); + } + } - @Override - public void onException(Message message, Exception e) { + @Override + public void onException(Message message, Exception e) { + try { + originalMessage.setMessageAccessible(true); + inCompletionListenerCallback.set(true); + producerInCompletionListenerCallback.set(true); + completionListener.onException(message, e); + } finally { + inCompletionListenerCallback.set(false); + producerInCompletionListenerCallback.set(false); + numIncompleteAsyncSend.doDecrement(); + producerNumIncompleteSend.doDecrement(); + isAsyncSendCompleted.countDown(); + } + } + }; try { - originalMessage.setMessageAccessible(true); - inCompletionListenerCallback.set(true); - completionListener.onException(message, e); - } finally { - numIncompletedAsyncSend.doDecrement(); - inCompletionListenerCallback.set(false); + connection.syncSendPacket(finalMsgRef, wrapperCompletionListener); + isAsyncSendCompleted.await(); + } catch (JMSException e) { + completionListener.onException(finalMsgRef, e); + } catch (InterruptedException e) { + completionListener.onException(finalMsgRef, e); } } - }; + }); + } else { + this.connection.syncSendPacket(msg, (CompletionListener) null); } - this.connection.syncSendPacket(msg, wrapperCompletionListener); } } @@ -2504,6 +2537,6 @@ private static void setForeignMessageDeliveryTime(final Message foreignMessage, } private void waitForAsyncSendToFinish() { - numIncompletedAsyncSend.doWaitForZero(); + numIncompleteAsyncSend.doWaitForZero(); } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AsyncSendTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AsyncSendTest.java index 4475b35b66a..bb780f5971a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AsyncSendTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2AsyncSendTest.java @@ -31,6 +31,7 @@ import jakarta.jms.JMSRuntimeException; import jakarta.jms.Message; import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageProducer; import jakarta.jms.Session; import jakarta.jms.TextMessage; import org.junit.Test; @@ -224,6 +225,36 @@ public void onException(Message message, Exception e) { } } + @Test + public void testUnableToCloseSessionInCompletionListener_spec_7_3_4() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback + Destination destination = session.createQueue(methodNameDestinationName); + MessageProducer messageProducer = session.createProducer(destination); + String textBody = "Test-" + methodNameDestinationName; + CountDownLatch latch = new CountDownLatch(1); + CompletionListener completionListener = new CompletionListener() { + + @Override + public void onCompletion(Message message) { + try { + session.close(); // This should cause a RuntimeException to throw and trigger the onException + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onException(Message message, Exception e) { + latch.countDown(); + } + }; + messageProducer.send(destination, session.createTextMessage(textBody), completionListener); + boolean status = latch.await(10L, TimeUnit.SECONDS); + if (!status) { + fail("the completion listener onException was not triggered within 10 seconds or threw an exception"); + } + } + @Test public void testUnableToCloseProducerInCompletionListener_spec_7_3_4() throws Exception { // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback @@ -322,7 +353,93 @@ public void onException(Message message, Exception e) { } @Test - public void testCloseContextWailUntilAllIncompleteSentToFinish_spec_7_3_4() throws Exception { + public void testSessionCloseBlockUntilAllAsyncSendFinish_spec_7_3_4() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback + Destination destination = session.createQueue(methodNameDestinationName); + MessageProducer messageProducer = session.createProducer(destination); + ArrayList expectedOrderedMessages = new ArrayList<>(); + ArrayList actualOrderedMessages = new ArrayList<>(); + Object mutex = new Object(); + int num_msgs = 100; + CompletionListener completionListener = new CompletionListener() { + @Override + public void onCompletion(Message message) { + synchronized (mutex) { + try { + String text = ((TextMessage) message).getText(); + actualOrderedMessages.add(text); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + + for (int i = 0; i < num_msgs; i++) { + String textBody = "Test-" + methodNameDestinationName + "-" + String.valueOf(i); + expectedOrderedMessages.add(textBody); + messageProducer.send( + destination, + session.createTextMessage(textBody), + completionListener); + } + session.close(); + if (expectedOrderedMessages.size() != actualOrderedMessages.size()) { + fail("jmsContext doesn't wait until all inComplete send to finish"); + } + } + + @Test + public void testProducerCloseBlockUntilAllAsyncSendFinish_spec_7_3_4() throws Exception { + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback + Destination destination = session.createQueue(methodNameDestinationName); + MessageProducer messageProducer = session.createProducer(destination); + ArrayList expectedOrderedMessages = new ArrayList<>(); + ArrayList actualOrderedMessages = new ArrayList<>(); + Object mutex = new Object(); + int num_msgs = 100; + CompletionListener completionListener = new CompletionListener() { + @Override + public void onCompletion(Message message) { + synchronized (mutex) { + try { + String text = ((TextMessage) message).getText(); + actualOrderedMessages.add(text); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void onException(Message message, Exception e) { + throw new RuntimeException(e); + } + }; + + for (int i = 0; i < num_msgs; i++) { + String textBody = "Test-" + methodNameDestinationName + "-" + String.valueOf(i); + expectedOrderedMessages.add(textBody); + messageProducer.send( + destination, + session.createTextMessage(textBody), + completionListener); + } + messageProducer.close(); + if (expectedOrderedMessages.size() != actualOrderedMessages.size()) { + fail("jmsContext doesn't wait until all inComplete send to finish"); + } + } + + + + @Test + public void testContextCloseBlockUntilAllAsyncSendFinish_spec_7_3_4() throws Exception { // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.AUTO_ACKNOWLEDGE)) { assertNotNull(jmsContext); @@ -368,7 +485,7 @@ public void onException(Message message, Exception e) { } @Test - public void testCommitContextWailUntilAllIncompleteSentToFinish_spec_7_3_4() throws Exception { + public void testContextCommitBlockUntilAllAsyncSendFinish_spec_7_3_4() throws Exception { // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#close-commit-or-rollback try(JMSContext jmsContext = activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS, Session.SESSION_TRANSACTED)) { assertNotNull(jmsContext); From 3f8a6cb68fca330de9dd4037470577bff15b0759 Mon Sep 17 00:00:00 2001 From: Ken Liao Date: Sun, 15 Dec 2024 20:47:58 -0800 Subject: [PATCH 5/5] Fix failed unit tests --- .../org/apache/activemq/ActiveMQSession.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index edb880d2d5a..63c77828f5b 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -587,9 +587,7 @@ public int getAcknowledgeMode() throws JMSException { @Override public void commit() throws JMSException { checkClosed(); - if (inCompletionListenerCallback.get()) { - throw new IllegalStateRuntimeException("Can't commit transacted session within CompletionListener"); - } + checkIsSessionUseAllowed("Can't commit transacted session within CompletionListener"); if (!getTransacted()) { throw new jakarta.jms.IllegalStateException("Not a transacted session"); } @@ -612,9 +610,7 @@ public void commit() throws JMSException { @Override public void rollback() throws JMSException { checkClosed(); - if (inCompletionListenerCallback.get() != null && inCompletionListenerCallback.get()) { - throw new IllegalStateRuntimeException("Can't rollback transacted session within CompletionListener"); - } + checkIsSessionUseAllowed("Can't rollback transacted session within CompletionListener"); if (!getTransacted()) { throw new jakarta.jms.IllegalStateException("Not a transacted session"); } @@ -656,9 +652,7 @@ public void rollback() throws JMSException { @Override public void close() throws JMSException { if (!closed) { - if (inCompletionListenerCallback.get()) { - throw new IllegalStateRuntimeException("Can't close session within CompletionListener"); - } + checkIsSessionUseAllowed("Can't close session within CompletionListener"); if (getTransactionContext().isInXATransaction()) { if (!synchronizationRegistered) { synchronizationRegistered = true; @@ -2539,4 +2533,10 @@ private static void setForeignMessageDeliveryTime(final Message foreignMessage, private void waitForAsyncSendToFinish() { numIncompleteAsyncSend.doWaitForZero(); } + + private void checkIsSessionUseAllowed(String errorMsg) { + if (inCompletionListenerCallback.get() != null && inCompletionListenerCallback.get()) { + throw new IllegalStateRuntimeException(errorMsg); + } + } }