diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index c8923249405..73dd3c57f6f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -82,6 +82,8 @@ public static String getMirrorAddress(String connectionName) { private final Map prefixes = new HashMap<>(); + private final Map temporaryPrefixes = new HashMap<>(); + /** * minLargeMessageSize determines when a message should be considered as large. minLargeMessageSize = -1 basically * disables large message control over AMQP. @@ -391,11 +393,32 @@ public void setMulticastPrefix(String multicastPrefix) { } } + @Override + public void setTemporaryAnycastPrefix(String temporaryAnycastPrefix) { + for (String prefix : temporaryAnycastPrefix.split(",")) { + prefixes.put(SimpleString.of(prefix), RoutingType.ANYCAST); + temporaryPrefixes.put(SimpleString.of(prefix), RoutingType.ANYCAST); + } + } + + @Override + public void setTemporaryMulticastPrefix(String temporaryMulticastPrefix) { + for (String prefix : temporaryMulticastPrefix.split(",")) { + prefixes.put(SimpleString.of(prefix), RoutingType.MULTICAST); + temporaryPrefixes.put(SimpleString.of(prefix), RoutingType.MULTICAST); + } + } + @Override public Map getPrefixes() { return prefixes; } + @Override + public Map getTemporaryPrefixes() { + return temporaryPrefixes; + } + @Override public AMQPRoutingHandler getRoutingHandler() { return routingHandler; diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 7b4375837ed..1299805a7d4 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -163,6 +163,8 @@ public OpenWireProtocolManager setOpenwireMaxPacketChunkSize(int openwireMaxPack private final Map prefixes = new HashMap<>(); + private final Map temporaryPrefixes = new HashMap<>(); + private final List incomingInterceptors = new ArrayList<>(); private final List outgoingInterceptors = new ArrayList<>(); @@ -686,11 +688,32 @@ public void setMulticastPrefix(String multicastPrefix) { } } + @Override + public void setTemporaryAnycastPrefix(String temporaryAnycastPrefix) { + for (String prefix : temporaryAnycastPrefix.split(",")) { + prefixes.put(SimpleString.of(prefix), RoutingType.ANYCAST); + temporaryPrefixes.put(SimpleString.of(prefix), RoutingType.ANYCAST); + } + } + + @Override + public void setTemporaryMulticastPrefix(String temporaryMulticastPrefix) { + for (String prefix : temporaryMulticastPrefix.split(",")) { + prefixes.put(SimpleString.of(prefix), RoutingType.MULTICAST); + temporaryPrefixes.put(SimpleString.of(prefix), RoutingType.MULTICAST); + } + } + @Override public Map getPrefixes() { return prefixes; } + @Override + public Map getTemporaryPrefixes() { + return temporaryPrefixes; + } + @Override public void setSecurityDomain(String securityDomain) { this.securityDomain = securityDomain; diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 3f7636226db..7ee2f94ffa4 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -549,11 +549,14 @@ StompPostReceiptFunction subscribe(String destination, String durableSubscriptionName, boolean noLocal, RoutingType subscriptionType, - Integer consumerWindowSize) throws ActiveMQStompException { + Integer consumerWindowSize, + RoutingType temporaryRoutingType) throws ActiveMQStompException { validateSelector(selector); - autoCreateDestinationIfPossible(destination, subscriptionType); - checkDestination(destination); - checkRoutingSemantics(destination, subscriptionType); + if (temporaryRoutingType == null) { + autoCreateDestinationIfPossible(destination, subscriptionType); + checkDestination(destination); + checkRoutingSemantics(destination, subscriptionType); + } if (noLocal) { String noLocalFilter = "(" + CONNECTION_ID_PROPERTY_NAME_STRING + " <> '" + getID().toString() + "' OR " + CONNECTION_ID_PROPERTY_NAME_STRING + " IS NULL)"; if (selector == null) { @@ -578,7 +581,7 @@ StompPostReceiptFunction subscribe(String destination, } try { - return manager.subscribe(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal, consumerWindowSize); + return manager.subscribe(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal, consumerWindowSize, temporaryRoutingType); } catch (ActiveMQStompException e) { throw e; } catch (Exception e) { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 83c9710b3ee..bb41f7dce58 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.BaseInterceptor; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; @@ -358,14 +359,15 @@ public StompPostReceiptFunction subscribe(StompConnection connection, String selector, String ack, boolean noLocal, - Integer consumerWindowSize) throws Exception { + Integer consumerWindowSize, + RoutingType temporaryRoutingType) throws Exception { StompSession stompSession = getSession(connection); if (stompSession.containsSubscription(subscriptionID)) { throw new ActiveMQStompException(connection, "There already is a subscription for: " + subscriptionID + ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination"); } long consumerID = server.getStorageManager().generateID(); - return stompSession.addSubscription(consumerID, subscriptionID, connection.getClientID(), durableSubscriptionName, destination, selector, ack, noLocal, consumerWindowSize); + return stompSession.addSubscription(consumerID, subscriptionID, connection.getClientID(), durableSubscriptionName, destination, selector, ack, noLocal, consumerWindowSize, temporaryRoutingType); } public void unsubscribe(StompConnection connection, diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index 9444ee58eba..4d1ce02a243 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -313,7 +313,8 @@ public StompPostReceiptFunction addSubscription(long consumerID, String selector, String ack, boolean noLocal, - Integer consumerWindowSize) throws Exception { + Integer consumerWindowSize, + RoutingType temporaryRoutingType) throws Exception { SimpleString address = SimpleString.of(destination); SimpleString queueName = SimpleString.of(destination); SimpleString selectorSimple = SimpleString.of(selector); @@ -327,26 +328,42 @@ public StompPostReceiptFunction addSubscription(long consumerID, finalConsumerWindowSize = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMER_WINDOW_SIZE, ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT, TransportConstants.STOMP_DEFAULT_CONSUMER_WINDOW_SIZE, connection.getAcceptorUsed().getConfiguration()), connection.getAcceptorUsed().getConfiguration()); } - Set routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes(); - boolean multicast = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST); - // if the destination is FQQN then the queue will have already been created - if (multicast && !CompositeAddress.isFullyQualified(destination)) { - // subscribes to a topic - if (durableSubscriptionName != null) { - if (clientID == null) { - throw BUNDLE.missingClientID(); + boolean multicast; + if (temporaryRoutingType != null) { + multicast = temporaryRoutingType == RoutingType.MULTICAST; + if (multicast) { + queueName = UUIDGenerator.getInstance().generateSimpleStringUUID(); + session.createQueue(QueueConfiguration.of(queueName).setAddress(address).setRoutingType(RoutingType.MULTICAST).setFilterString(selectorSimple).setDurable(false).setTemporary(true)); + } else { + try { + session.createQueue(QueueConfiguration.of(address).setAddress(address).setRoutingType(RoutingType.ANYCAST).setFilterString(selectorSimple).setDurable(false).setTemporary(true)); + } catch (ActiveMQQueueExistsException e) { + // queue may already exist if a sender created it first } - queueName = SimpleString.of(clientID + "." + durableSubscriptionName); - if (manager.getServer().locateQueue(queueName) == null) { - try { - session.createQueue(QueueConfiguration.of(queueName).setAddress(address).setFilterString(selectorSimple)); - } catch (ActiveMQQueueExistsException e) { - // ignore; can be caused by concurrent durable subscribers + queueName = address; + } + } else { + Set routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes(); + multicast = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST); + // if the destination is FQQN then the queue will have already been created + if (multicast && !CompositeAddress.isFullyQualified(destination)) { + // subscribes to a topic + if (durableSubscriptionName != null) { + if (clientID == null) { + throw BUNDLE.missingClientID(); } + queueName = SimpleString.of(clientID + "." + durableSubscriptionName); + if (manager.getServer().locateQueue(queueName) == null) { + try { + session.createQueue(QueueConfiguration.of(queueName).setAddress(address).setFilterString(selectorSimple)); + } catch (ActiveMQQueueExistsException e) { + // ignore; can be caused by concurrent durable subscribers + } + } + } else { + queueName = UUIDGenerator.getInstance().generateSimpleStringUUID(); + session.createQueue(QueueConfiguration.of(queueName).setAddress(address).setFilterString(selectorSimple).setDurable(false).setTemporary(true)); } - } else { - queueName = UUIDGenerator.getInstance().generateSimpleStringUUID(); - session.createQueue(QueueConfiguration.of(queueName).setAddress(address).setFilterString(selectorSimple).setDurable(false).setTemporary(true)); } } if (noLocal) { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index 1f09cc9c6bb..bff9727c7f8 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -18,6 +18,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.Map; import java.util.Objects; import java.util.concurrent.ScheduledExecutorService; @@ -263,6 +264,8 @@ public StompFrame onAbort(StompFrame request) { } public StompPostReceiptFunction onSubscribe(StompFrame frame) throws Exception { + String rawDestination = frame.getHeader(Headers.Subscribe.DESTINATION); + RoutingType temporaryRoutingType = getTemporaryRoutingType(rawDestination); String destination = getDestination(frame); String selector = frame.getHeader(Stomp.Headers.Subscribe.SELECTOR); @@ -288,7 +291,19 @@ public StompPostReceiptFunction onSubscribe(StompFrame frame) throws Exception { } else if (frame.hasHeader(Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE)) { consumerWindowSize = Integer.parseInt(frame.getHeader(Stomp.Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE)); } - return connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType, consumerWindowSize); + return connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType, consumerWindowSize, temporaryRoutingType); + } + + private RoutingType getTemporaryRoutingType(String rawDestination) { + if (rawDestination != null) { + SimpleString dest = SimpleString.of(rawDestination); + for (Map.Entry entry : connection.getManager().getTemporaryPrefixes().entrySet()) { + if (dest.startsWith(entry.getKey())) { + return entry.getValue(); + } + } + } + return null; } public String getDestination(StompFrame request) throws Exception { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java index c8a17b09a57..6e86e76cab6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java @@ -94,6 +94,8 @@ public class CoreProtocolManager implements ProtocolManager prefixes = new HashMap<>(); + private final Map temporaryPrefixes = new HashMap<>(); + private String securityDomain; private final ActiveMQRoutingHandler routingHandler; @@ -227,11 +229,32 @@ public void setMulticastPrefix(String multicastPrefix) { } } + @Override + public void setTemporaryAnycastPrefix(String temporaryAnycastPrefix) { + for (String prefix : temporaryAnycastPrefix.split(",")) { + prefixes.put(SimpleString.of(prefix), RoutingType.ANYCAST); + temporaryPrefixes.put(SimpleString.of(prefix), RoutingType.ANYCAST); + } + } + + @Override + public void setTemporaryMulticastPrefix(String temporaryMulticastPrefix) { + for (String prefix : temporaryMulticastPrefix.split(",")) { + prefixes.put(SimpleString.of(prefix), RoutingType.MULTICAST); + temporaryPrefixes.put(SimpleString.of(prefix), RoutingType.MULTICAST); + } + } + @Override public Map getPrefixes() { return prefixes; } + @Override + public Map getTemporaryPrefixes() { + return temporaryPrefixes; + } + @Override public void setSecurityDomain(String securityDomain) { this.securityDomain = securityDomain; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java index 4b10b90a599..689f5e44a98 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java @@ -33,6 +33,8 @@ public abstract class AbstractProtocolManager, C private final Map prefixes = new HashMap<>(); + private final Map temporaryPrefixes = new HashMap<>(); + private String securityDomain; protected String invokeInterceptors(final List interceptors, final P message, final C connection) { @@ -65,11 +67,32 @@ public void setMulticastPrefix(String multicastPrefix) { } } + @Override + public void setTemporaryAnycastPrefix(String temporaryAnycastPrefix) { + for (String prefix : temporaryAnycastPrefix.split(",")) { + prefixes.put(SimpleString.of(prefix), RoutingType.ANYCAST); + temporaryPrefixes.put(SimpleString.of(prefix), RoutingType.ANYCAST); + } + } + + @Override + public void setTemporaryMulticastPrefix(String temporaryMulticastPrefix) { + for (String prefix : temporaryMulticastPrefix.split(",")) { + prefixes.put(SimpleString.of(prefix), RoutingType.MULTICAST); + temporaryPrefixes.put(SimpleString.of(prefix), RoutingType.MULTICAST); + } + } + @Override public Map getPrefixes() { return prefixes; } + @Override + public Map getTemporaryPrefixes() { + return temporaryPrefixes; + } + @Override public String getSecurityDomain() { return securityDomain; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java index 60b94c639bd..f1e774d42b8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java @@ -75,8 +75,14 @@ default void removeHandler(String name) { void setMulticastPrefix(String multicastPrefix); + void setTemporaryAnycastPrefix(String temporaryAnycastPrefix); + + void setTemporaryMulticastPrefix(String temporaryMulticastPrefix); + Map getPrefixes(); + Map getTemporaryPrefixes(); + void setSecurityDomain(String securityDomain); String getSecurityDomain(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTemporaryPrefixTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTemporaryPrefixTest.java new file mode 100644 index 00000000000..466d67ccdaf --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTemporaryPrefixTest.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.stomp; + +import java.net.URI; +import java.util.Collection; +import java.util.UUID; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.protocol.stomp.Stomp; +import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration tests for the temporaryAnycastPrefix and temporaryMulticastPrefix acceptor parameters. + * + * These prefixes allow STOMP clients to subscribe to destinations using a well-known prefix + * (e.g. /temp-queue/ or /temp-topic/) to create temporary resources that are automatically + * deleted when the client disconnects. This mirrors the behaviour of ActiveMQ 5.x. + */ +public class StompTemporaryPrefixTest extends StompTestBase { + + private static final String TEMP_QUEUE_PREFIX = "/temp-queue/"; + private static final String TEMP_TOPIC_PREFIX = "/temp-topic/"; + private static final int TEST_PORT = 61614; + + public StompTemporaryPrefixTest() { + super("tcp+v10.stomp"); + } + + @Override + protected ActiveMQServer createServer() throws Exception { + ActiveMQServer server = super.createServer(); + server.getConfiguration().setAddressQueueScanPeriod(100); + return server; + } + + // --- temporary queue (ANYCAST) tests --- + + @Test + public void testTemporaryQueuePrefixSubscribeCreatesTemporaryAnycastQueue() throws Exception { + URI uri = createStompClientUri(scheme, hostname, TEST_PORT); + String address = UUID.randomUUID().toString(); + + server.getRemotingService().createAcceptor("test", + "tcp://" + hostname + ":" + TEST_PORT + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + + "&temporaryAnycastPrefix=" + TEMP_QUEUE_PREFIX).start(); + + StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri); + conn.connect(defUser, defPass); + + String receiptId = UUID.randomUUID().toString(); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, TEMP_QUEUE_PREFIX + address) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, receiptId); + frame = conn.sendFrame(frame); + assertEquals(receiptId, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + + org.apache.activemq.artemis.core.server.Queue queue = server.locateQueue(SimpleString.of(address)); + assertNotNull(queue, "Subscribing with temporaryAnycastPrefix should create an ANYCAST queue"); + assertTrue(queue.isTemporary(), "Queue created by temporaryAnycastPrefix should be temporary"); + assertEquals(RoutingType.ANYCAST, queue.getRoutingType()); + + conn.disconnect(); + } + + @Test + public void testTemporaryQueueDeletedOnDisconnect() throws Exception { + URI uri = createStompClientUri(scheme, hostname, TEST_PORT); + String address = UUID.randomUUID().toString(); + + server.getRemotingService().createAcceptor("test", + "tcp://" + hostname + ":" + TEST_PORT + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + + "&temporaryAnycastPrefix=" + TEMP_QUEUE_PREFIX).start(); + + StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri); + conn.connect(defUser, defPass); + + String receiptId = UUID.randomUUID().toString(); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, TEMP_QUEUE_PREFIX + address) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, receiptId); + frame = conn.sendFrame(frame); + assertEquals(receiptId, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + assertNotNull(server.locateQueue(SimpleString.of(address))); + + conn.disconnect(); + + Wait.assertTrue("Temporary ANYCAST queue should be deleted after client disconnects", + () -> server.locateQueue(SimpleString.of(address)) == null); + } + + @Test + public void testSendReceiveViaTemporaryQueue() throws Exception { + URI uri = createStompClientUri(scheme, hostname, TEST_PORT); + String address = UUID.randomUUID().toString(); + + server.getRemotingService().createAcceptor("test", + "tcp://" + hostname + ":" + TEST_PORT + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + + "&temporaryAnycastPrefix=" + TEMP_QUEUE_PREFIX).start(); + + StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri); + conn.connect(defUser, defPass); + + // Subscribe first so the temp queue exists before sending + String receiptId = UUID.randomUUID().toString(); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, TEMP_QUEUE_PREFIX + address) + .addHeader(Stomp.Headers.Subscribe.ID, "sub-1") + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, receiptId); + frame = conn.sendFrame(frame); + assertEquals(receiptId, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + + send(conn, TEMP_QUEUE_PREFIX + address, null, "Hello Temp Queue", true); + + frame = conn.receiveFrame(5000); + assertNotNull(frame, "Should have received a message on the temporary queue"); + assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + assertEquals("Hello Temp Queue", frame.getBody()); + + conn.disconnect(); + } + + // --- temporary topic (MULTICAST) tests --- + + @Test + public void testTemporaryTopicPrefixSubscribeCreatesTemporaryMulticastQueue() throws Exception { + URI uri = createStompClientUri(scheme, hostname, TEST_PORT); + String address = UUID.randomUUID().toString(); + + server.getRemotingService().createAcceptor("test", + "tcp://" + hostname + ":" + TEST_PORT + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + + "&temporaryMulticastPrefix=" + TEMP_TOPIC_PREFIX).start(); + + StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri); + conn.connect(defUser, defPass); + + String receiptId = UUID.randomUUID().toString(); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, TEMP_TOPIC_PREFIX + address) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, receiptId); + frame = conn.sendFrame(frame); + assertEquals(receiptId, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + + AddressInfo addressInfo = server.getAddressInfo(SimpleString.of(address)); + assertNotNull(addressInfo, "Subscribing with temporaryMulticastPrefix should create an address"); + assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST), + "Address created by temporaryMulticastPrefix should support MULTICAST routing"); + + // A temporary MULTICAST queue (with a UUID name) should be bound to the address + Collection bindings = server.getPostOffice().getDirectBindings(SimpleString.of(address)); + long tempMulticastQueueCount = bindings.stream() + .filter(b -> b instanceof QueueBinding) + .map(b -> ((QueueBinding) b).getQueue()) + .filter(q -> q.isTemporary() && q.getRoutingType() == RoutingType.MULTICAST) + .count(); + assertEquals(1, tempMulticastQueueCount, + "Exactly one temporary MULTICAST queue should be bound to the address"); + + conn.disconnect(); + } + + @Test + public void testTemporaryTopicDeletedOnDisconnect() throws Exception { + URI uri = createStompClientUri(scheme, hostname, TEST_PORT); + String address = UUID.randomUUID().toString(); + + server.getRemotingService().createAcceptor("test", + "tcp://" + hostname + ":" + TEST_PORT + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + + "&temporaryMulticastPrefix=" + TEMP_TOPIC_PREFIX).start(); + + StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri); + conn.connect(defUser, defPass); + + String receiptId = UUID.randomUUID().toString(); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, TEMP_TOPIC_PREFIX + address) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, receiptId); + frame = conn.sendFrame(frame); + assertEquals(receiptId, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + assertNotNull(server.getAddressInfo(SimpleString.of(address))); + + conn.disconnect(); + + Wait.assertTrue("Temporary MULTICAST address should be deleted after client disconnects", + () -> server.getAddressInfo(SimpleString.of(address)) == null); + } + + @Test + public void testSendReceiveViaTemporaryTopic() throws Exception { + URI uri = createStompClientUri(scheme, hostname, TEST_PORT); + String address = UUID.randomUUID().toString(); + + server.getRemotingService().createAcceptor("test", + "tcp://" + hostname + ":" + TEST_PORT + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + + "&temporaryMulticastPrefix=" + TEMP_TOPIC_PREFIX).start(); + + StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri); + conn.connect(defUser, defPass); + + // Subscribe first so the temp queue exists before sending + String receiptId = UUID.randomUUID().toString(); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, TEMP_TOPIC_PREFIX + address) + .addHeader(Stomp.Headers.Subscribe.ID, "sub-1") + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, receiptId); + frame = conn.sendFrame(frame); + assertEquals(receiptId, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + + send(conn, TEMP_TOPIC_PREFIX + address, null, "Hello Temp Topic", true); + + frame = conn.receiveFrame(5000); + assertNotNull(frame, "Should have received a message on the temporary topic"); + assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + assertEquals("Hello Temp Topic", frame.getBody()); + + conn.disconnect(); + } + + // --- combined prefix tests --- + + @Test + public void testBothTemporaryPrefixesOnSameAcceptor() throws Exception { + URI uri = createStompClientUri(scheme, hostname, TEST_PORT); + String queueAddress = UUID.randomUUID().toString(); + String topicAddress = UUID.randomUUID().toString(); + + server.getRemotingService().createAcceptor("test", + "tcp://" + hostname + ":" + TEST_PORT + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + + "&temporaryAnycastPrefix=" + TEMP_QUEUE_PREFIX + + "&temporaryMulticastPrefix=" + TEMP_TOPIC_PREFIX).start(); + + StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri); + conn.connect(defUser, defPass); + + // Subscribe to a temp-queue destination + String receiptId1 = UUID.randomUUID().toString(); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, TEMP_QUEUE_PREFIX + queueAddress) + .addHeader(Stomp.Headers.Subscribe.ID, "sub-queue") + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, receiptId1); + frame = conn.sendFrame(frame); + assertEquals(receiptId1, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + + org.apache.activemq.artemis.core.server.Queue queue = server.locateQueue(SimpleString.of(queueAddress)); + assertNotNull(queue, "Temp ANYCAST queue should be created"); + assertTrue(queue.isTemporary()); + assertEquals(RoutingType.ANYCAST, queue.getRoutingType()); + + // Subscribe to a temp-topic destination + String receiptId2 = UUID.randomUUID().toString(); + frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, TEMP_TOPIC_PREFIX + topicAddress) + .addHeader(Stomp.Headers.Subscribe.ID, "sub-topic") + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, receiptId2); + frame = conn.sendFrame(frame); + assertEquals(receiptId2, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + + AddressInfo addressInfo = server.getAddressInfo(SimpleString.of(topicAddress)); + assertNotNull(addressInfo, "Temp MULTICAST address should be created"); + assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST)); + + conn.disconnect(); + + Wait.assertTrue("Temp ANYCAST queue should be deleted after disconnect", + () -> server.locateQueue(SimpleString.of(queueAddress)) == null); + Wait.assertTrue("Temp MULTICAST address should be deleted after disconnect", + () -> server.getAddressInfo(SimpleString.of(topicAddress)) == null); + } + + @Test + public void testNonTempSubscriptionStillWorksWithTemporaryPrefixConfigured() throws Exception { + URI uri = createStompClientUri(scheme, hostname, TEST_PORT); + + server.getRemotingService().createAcceptor("test", + "tcp://" + hostname + ":" + TEST_PORT + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + + "&temporaryAnycastPrefix=" + TEMP_QUEUE_PREFIX + + "&anycastPrefix=/queue/").start(); + + StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri); + conn.connect(defUser, defPass); + + // Subscribe to the regular pre-existing anycast queue using the anycast prefix + String receiptId = UUID.randomUUID().toString(); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, "/queue/" + getQueueName()) + .addHeader(Stomp.Headers.Subscribe.ID, "sub-regular") + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, receiptId); + frame = conn.sendFrame(frame); + assertEquals(receiptId, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID), + "Regular subscription with anycastPrefix should still work alongside temporaryAnycastPrefix"); + + send(conn, "/queue/" + getQueueName(), null, "Hello Regular", true); + + frame = conn.receiveFrame(5000); + assertNotNull(frame); + assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + assertEquals("Hello Regular", frame.getBody()); + + // Verify the regular queue was NOT created as temporary + org.apache.activemq.artemis.core.server.Queue queue = server.locateQueue(SimpleString.of(getQueueName())); + assertNotNull(queue); + assertFalse(queue.isTemporary(), "Regular queue should not be temporary"); + + conn.disconnect(); + } +}