Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public static String getMirrorAddress(String connectionName) {

private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();

private final Map<SimpleString, RoutingType> temporaryPrefixes = new HashMap<>();

/**
* minLargeMessageSize determines when a message should be considered as large. minLargeMessageSize = -1 basically
* disables large message control over AMQP.
Expand Down Expand Up @@ -391,11 +393,32 @@ public void setMulticastPrefix(String multicastPrefix) {
}
}

@Override
public void setTemporaryAnycastPrefix(String temporaryAnycastPrefix) {
for (String prefix : temporaryAnycastPrefix.split(",")) {
prefixes.put(SimpleString.of(prefix), RoutingType.ANYCAST);
temporaryPrefixes.put(SimpleString.of(prefix), RoutingType.ANYCAST);
}
}

@Override
public void setTemporaryMulticastPrefix(String temporaryMulticastPrefix) {
for (String prefix : temporaryMulticastPrefix.split(",")) {
prefixes.put(SimpleString.of(prefix), RoutingType.MULTICAST);
temporaryPrefixes.put(SimpleString.of(prefix), RoutingType.MULTICAST);
}
}

@Override
public Map<SimpleString, RoutingType> getPrefixes() {
return prefixes;
}

@Override
public Map<SimpleString, RoutingType> getTemporaryPrefixes() {
return temporaryPrefixes;
}

@Override
public AMQPRoutingHandler getRoutingHandler() {
return routingHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ public OpenWireProtocolManager setOpenwireMaxPacketChunkSize(int openwireMaxPack

private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();

private final Map<SimpleString, RoutingType> temporaryPrefixes = new HashMap<>();

private final List<OpenWireInterceptor> incomingInterceptors = new ArrayList<>();
private final List<OpenWireInterceptor> outgoingInterceptors = new ArrayList<>();

Expand Down Expand Up @@ -686,11 +688,32 @@ public void setMulticastPrefix(String multicastPrefix) {
}
}

@Override
public void setTemporaryAnycastPrefix(String temporaryAnycastPrefix) {
for (String prefix : temporaryAnycastPrefix.split(",")) {
prefixes.put(SimpleString.of(prefix), RoutingType.ANYCAST);
temporaryPrefixes.put(SimpleString.of(prefix), RoutingType.ANYCAST);
}
}

@Override
public void setTemporaryMulticastPrefix(String temporaryMulticastPrefix) {
for (String prefix : temporaryMulticastPrefix.split(",")) {
prefixes.put(SimpleString.of(prefix), RoutingType.MULTICAST);
temporaryPrefixes.put(SimpleString.of(prefix), RoutingType.MULTICAST);
}
}

@Override
public Map<SimpleString, RoutingType> getPrefixes() {
return prefixes;
}

@Override
public Map<SimpleString, RoutingType> getTemporaryPrefixes() {
return temporaryPrefixes;
}

@Override
public void setSecurityDomain(String securityDomain) {
this.securityDomain = securityDomain;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,11 +549,14 @@ StompPostReceiptFunction subscribe(String destination,
String durableSubscriptionName,
boolean noLocal,
RoutingType subscriptionType,
Integer consumerWindowSize) throws ActiveMQStompException {
Integer consumerWindowSize,
RoutingType temporaryRoutingType) throws ActiveMQStompException {
validateSelector(selector);
autoCreateDestinationIfPossible(destination, subscriptionType);
checkDestination(destination);
checkRoutingSemantics(destination, subscriptionType);
if (temporaryRoutingType == null) {
autoCreateDestinationIfPossible(destination, subscriptionType);
checkDestination(destination);
checkRoutingSemantics(destination, subscriptionType);
}
if (noLocal) {
String noLocalFilter = "(" + CONNECTION_ID_PROPERTY_NAME_STRING + " <> '" + getID().toString() + "' OR " + CONNECTION_ID_PROPERTY_NAME_STRING + " IS NULL)";
if (selector == null) {
Expand All @@ -578,7 +581,7 @@ StompPostReceiptFunction subscribe(String destination,
}

try {
return manager.subscribe(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal, consumerWindowSize);
return manager.subscribe(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal, consumerWindowSize, temporaryRoutingType);
} catch (ActiveMQStompException e) {
throw e;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback;
Expand Down Expand Up @@ -358,14 +359,15 @@ public StompPostReceiptFunction subscribe(StompConnection connection,
String selector,
String ack,
boolean noLocal,
Integer consumerWindowSize) throws Exception {
Integer consumerWindowSize,
RoutingType temporaryRoutingType) throws Exception {
StompSession stompSession = getSession(connection);
if (stompSession.containsSubscription(subscriptionID)) {
throw new ActiveMQStompException(connection, "There already is a subscription for: " + subscriptionID +
". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
}
long consumerID = server.getStorageManager().generateID();
return stompSession.addSubscription(consumerID, subscriptionID, connection.getClientID(), durableSubscriptionName, destination, selector, ack, noLocal, consumerWindowSize);
return stompSession.addSubscription(consumerID, subscriptionID, connection.getClientID(), durableSubscriptionName, destination, selector, ack, noLocal, consumerWindowSize, temporaryRoutingType);
}

public void unsubscribe(StompConnection connection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ public StompPostReceiptFunction addSubscription(long consumerID,
String selector,
String ack,
boolean noLocal,
Integer consumerWindowSize) throws Exception {
Integer consumerWindowSize,
RoutingType temporaryRoutingType) throws Exception {
SimpleString address = SimpleString.of(destination);
SimpleString queueName = SimpleString.of(destination);
SimpleString selectorSimple = SimpleString.of(selector);
Expand All @@ -327,26 +328,42 @@ public StompPostReceiptFunction addSubscription(long consumerID,
finalConsumerWindowSize = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMER_WINDOW_SIZE, ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT, TransportConstants.STOMP_DEFAULT_CONSUMER_WINDOW_SIZE, connection.getAcceptorUsed().getConfiguration()), connection.getAcceptorUsed().getConfiguration());
}

Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes();
boolean multicast = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
// if the destination is FQQN then the queue will have already been created
if (multicast && !CompositeAddress.isFullyQualified(destination)) {
// subscribes to a topic
if (durableSubscriptionName != null) {
if (clientID == null) {
throw BUNDLE.missingClientID();
boolean multicast;
if (temporaryRoutingType != null) {
multicast = temporaryRoutingType == RoutingType.MULTICAST;
if (multicast) {
queueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
session.createQueue(QueueConfiguration.of(queueName).setAddress(address).setRoutingType(RoutingType.MULTICAST).setFilterString(selectorSimple).setDurable(false).setTemporary(true));
} else {
try {
session.createQueue(QueueConfiguration.of(address).setAddress(address).setRoutingType(RoutingType.ANYCAST).setFilterString(selectorSimple).setDurable(false).setTemporary(true));
} catch (ActiveMQQueueExistsException e) {
// queue may already exist if a sender created it first
}
queueName = SimpleString.of(clientID + "." + durableSubscriptionName);
if (manager.getServer().locateQueue(queueName) == null) {
try {
session.createQueue(QueueConfiguration.of(queueName).setAddress(address).setFilterString(selectorSimple));
} catch (ActiveMQQueueExistsException e) {
// ignore; can be caused by concurrent durable subscribers
queueName = address;
}
} else {
Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes();
multicast = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
// if the destination is FQQN then the queue will have already been created
if (multicast && !CompositeAddress.isFullyQualified(destination)) {
// subscribes to a topic
if (durableSubscriptionName != null) {
if (clientID == null) {
throw BUNDLE.missingClientID();
}
queueName = SimpleString.of(clientID + "." + durableSubscriptionName);
if (manager.getServer().locateQueue(queueName) == null) {
try {
session.createQueue(QueueConfiguration.of(queueName).setAddress(address).setFilterString(selectorSimple));
} catch (ActiveMQQueueExistsException e) {
// ignore; can be caused by concurrent durable subscribers
}
}
} else {
queueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
session.createQueue(QueueConfiguration.of(queueName).setAddress(address).setFilterString(selectorSimple).setDurable(false).setTemporary(true));
}
} else {
queueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
session.createQueue(QueueConfiguration.of(queueName).setAddress(address).setFilterString(selectorSimple).setDurable(false).setTemporary(true));
}
}
if (noLocal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;

Expand Down Expand Up @@ -263,6 +264,8 @@ public StompFrame onAbort(StompFrame request) {
}

public StompPostReceiptFunction onSubscribe(StompFrame frame) throws Exception {
String rawDestination = frame.getHeader(Headers.Subscribe.DESTINATION);
RoutingType temporaryRoutingType = getTemporaryRoutingType(rawDestination);
String destination = getDestination(frame);

String selector = frame.getHeader(Stomp.Headers.Subscribe.SELECTOR);
Expand All @@ -288,7 +291,19 @@ public StompPostReceiptFunction onSubscribe(StompFrame frame) throws Exception {
} else if (frame.hasHeader(Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE)) {
consumerWindowSize = Integer.parseInt(frame.getHeader(Stomp.Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE));
}
return connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType, consumerWindowSize);
return connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType, consumerWindowSize, temporaryRoutingType);
}

private RoutingType getTemporaryRoutingType(String rawDestination) {
if (rawDestination != null) {
SimpleString dest = SimpleString.of(rawDestination);
for (Map.Entry<SimpleString, RoutingType> entry : connection.getManager().getTemporaryPrefixes().entrySet()) {
if (dest.startsWith(entry.getKey())) {
return entry.getValue();
}
}
}
return null;
}

public String getDestination(StompFrame request) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor, ActiveM

private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();

private final Map<SimpleString, RoutingType> temporaryPrefixes = new HashMap<>();

private String securityDomain;

private final ActiveMQRoutingHandler routingHandler;
Expand Down Expand Up @@ -227,11 +229,32 @@ public void setMulticastPrefix(String multicastPrefix) {
}
}

@Override
public void setTemporaryAnycastPrefix(String temporaryAnycastPrefix) {
for (String prefix : temporaryAnycastPrefix.split(",")) {
prefixes.put(SimpleString.of(prefix), RoutingType.ANYCAST);
temporaryPrefixes.put(SimpleString.of(prefix), RoutingType.ANYCAST);
}
}

@Override
public void setTemporaryMulticastPrefix(String temporaryMulticastPrefix) {
for (String prefix : temporaryMulticastPrefix.split(",")) {
prefixes.put(SimpleString.of(prefix), RoutingType.MULTICAST);
temporaryPrefixes.put(SimpleString.of(prefix), RoutingType.MULTICAST);
}
}

@Override
public Map<SimpleString, RoutingType> getPrefixes() {
return prefixes;
}

@Override
public Map<SimpleString, RoutingType> getTemporaryPrefixes() {
return temporaryPrefixes;
}

@Override
public void setSecurityDomain(String securityDomain) {
this.securityDomain = securityDomain;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public abstract class AbstractProtocolManager<P, I extends BaseInterceptor<P>, C

private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();

private final Map<SimpleString, RoutingType> temporaryPrefixes = new HashMap<>();

private String securityDomain;

protected String invokeInterceptors(final List<I> interceptors, final P message, final C connection) {
Expand Down Expand Up @@ -65,11 +67,32 @@ public void setMulticastPrefix(String multicastPrefix) {
}
}

@Override
public void setTemporaryAnycastPrefix(String temporaryAnycastPrefix) {
for (String prefix : temporaryAnycastPrefix.split(",")) {
prefixes.put(SimpleString.of(prefix), RoutingType.ANYCAST);
temporaryPrefixes.put(SimpleString.of(prefix), RoutingType.ANYCAST);
}
}

@Override
public void setTemporaryMulticastPrefix(String temporaryMulticastPrefix) {
for (String prefix : temporaryMulticastPrefix.split(",")) {
prefixes.put(SimpleString.of(prefix), RoutingType.MULTICAST);
temporaryPrefixes.put(SimpleString.of(prefix), RoutingType.MULTICAST);
}
}

@Override
public Map<SimpleString, RoutingType> getPrefixes() {
return prefixes;
}

@Override
public Map<SimpleString, RoutingType> getTemporaryPrefixes() {
return temporaryPrefixes;
}

@Override
public String getSecurityDomain() {
return securityDomain;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,14 @@ default void removeHandler(String name) {

void setMulticastPrefix(String multicastPrefix);

void setTemporaryAnycastPrefix(String temporaryAnycastPrefix);

void setTemporaryMulticastPrefix(String temporaryMulticastPrefix);

Map<SimpleString, RoutingType> getPrefixes();

Map<SimpleString, RoutingType> getTemporaryPrefixes();

void setSecurityDomain(String securityDomain);

String getSecurityDomain();
Expand Down
Loading