Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,12 @@ public void addDestination(DestinationInfo info) throws Exception {
return;
}

SimpleString qName = SimpleString.of(dest.getPhysicalName());
// Use normalized core name for broker lookup / auto-create, but keep the
// original ActiveMQDestination for advisories / state (back-compat).
final String coreLookupName = dest.isQueue()
? org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.toCoreProduceAddress(dest)
: dest.getPhysicalName();
SimpleString qName = SimpleString.of(coreLookupName);

AutoCreateResult autoCreateResult = internalSession.checkAutoCreate(QueueConfiguration.of(qName)
.setRoutingType(dest.isQueue() ? RoutingType.ANYCAST : RoutingType.MULTICAST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConstants;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
Expand Down Expand Up @@ -169,8 +170,7 @@ public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, lo
}
}

SimpleString destinationName = SimpleString.of(session.convertWildcard(openwireDestination));

SimpleString destinationName = SimpleString.of(OpenWireUtil.toCoreConsumePattern(openwireDestination, session.getCoreServer()));
if (openwireDestination.isTopic()) {
SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), destinationName.toString(), info.getSubscriptionName(), selector, destinationName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD;

public class AMQSession implements SessionCallback {

private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

// ConsumerID is generated inside the session, 0, 1, 2, ... as many consumers as you have on the session
Expand Down Expand Up @@ -142,13 +141,11 @@ public void initialize() {

}


@Override
public boolean supportsDirectDelivery() {
return false;
}


@Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
if (consumer.getProtocolData() != null) {
Expand All @@ -174,16 +171,16 @@ public List<AMQConsumer> createConsumer(ConsumerInfo info,

for (ActiveMQDestination openWireDest : dests) {
boolean isInternalAddress = false;
if (AdvisorySupport.isAdvisoryTopic(dest)) {
if (AdvisorySupport.isAdvisoryTopic(openWireDest)) {
if (!connection.isSuppportAdvisory()) {
continue;
}
isInternalAddress = connection.isSuppressInternalManagementObjects();
}
if (openWireDest.isQueue()) {
openWireDest = protocolManager.virtualTopicConsumerToFQQN(openWireDest);
SimpleString queueName = SimpleString.of(convertWildcard(openWireDest));

final String consumeName = OpenWireUtil.toCoreConsumePattern(openWireDest, server);
final SimpleString queueName = SimpleString.of(consumeName);
if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary(), OpenWireUtil.extractFilterStringOrNull(info, openWireDest))) {
throw new InvalidDestinationException("Destination doesn't exist: " + queueName);
}
Expand Down Expand Up @@ -364,8 +361,10 @@ public void send(final ProducerInfo producerInfo,
messageSend.setBrokerInTime(System.currentTimeMillis());

final ActiveMQDestination destination = messageSend.getDestination();
final ActiveMQDestination producerDestination = producerInfo.getDestination();
final ActiveMQDestination effectiveDestination = (destination != null) ? destination : producerDestination;

if (producerInfo.getDestination() == null) {
if (producerDestination == null) {
// a named producer will have its target destination checked on create but an
// anonymous producer can send to different addresses on each send so we need to
// check here before going into message conversion and pre-dispatch stages before
Expand All @@ -375,9 +374,9 @@ public void send(final ProducerInfo producerInfo,

final ActiveMQDestination[] actualDestinations;
final int actualDestinationsCount;
if (destination.isComposite()) {
actualDestinations = destination.getCompositeDestinations();
messageSend.setOriginalDestination(destination);
if (effectiveDestination.isComposite()) {
actualDestinations = effectiveDestination.getCompositeDestinations();
messageSend.setOriginalDestination(effectiveDestination);
actualDestinationsCount = actualDestinations.length;
} else {
actualDestinations = null;
Expand Down Expand Up @@ -408,15 +407,15 @@ public void send(final ProducerInfo producerInfo,
}

for (int i = 0; i < actualDestinationsCount; i++) {
final ActiveMQDestination dest = actualDestinations != null ? actualDestinations[i] : destination;
final String physicalName = dest.getPhysicalName();
final SimpleString address = SimpleString.of(physicalName, coreMessageObjectPools.getAddressStringSimpleStringPool());
final ActiveMQDestination dest = (actualDestinations != null) ? actualDestinations[i] : effectiveDestination;
final String normalizedName = OpenWireUtil.toCoreProduceAddress(dest);
final SimpleString address = SimpleString.of(normalizedName, coreMessageObjectPools.getAddressStringSimpleStringPool());
//the last coreMsg could be directly the original one -> it avoid 1 copy if actualDestinations > 1 and ANY copy if actualDestinations == 1
final org.apache.activemq.artemis.api.core.Message coreMsg = (i == actualDestinationsCount - 1) ? originalCoreMsg : originalCoreMsg.copy();
coreMsg.setAddress(address);

if (dest.isQueue()) {
checkCachedExistingQueues(address, physicalName, dest.isTemporary());
checkCachedExistingQueues(address, normalizedName, dest.isTemporary());
coreMsg.setRoutingType(RoutingType.ANYCAST);
} else {
coreMsg.setRoutingType(RoutingType.MULTICAST);
Expand Down Expand Up @@ -446,7 +445,6 @@ public void send(final ProducerInfo producerInfo,
}
}
}

private void sendShouldBlockProducer(final ProducerInfo producerInfo,
final Message messageSend,
final boolean sendProducerAck,
Expand Down Expand Up @@ -531,14 +529,6 @@ private void blockConnection() {
connection.blockConnection();
}

public String convertWildcard(ActiveMQDestination openWireDest) {
if (openWireDest.isTemporary() || AdvisorySupport.isAdvisoryTopic(openWireDest)) {
return openWireDest.getPhysicalName();
} else {
return OPENWIRE_WILDCARD.convert(openWireDest.getPhysicalName(), server.getConfiguration().getWildcardConfiguration());
}
}

public ServerSession getCoreSession() {
return this.coreSession;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire.util;

import java.util.function.Function;

import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
Expand All @@ -28,13 +34,21 @@

public class OpenWireUtil {

public static final WildcardConfiguration OPENWIRE_WILDCARD = new WildcardConfiguration().setDelimiter('.').setAnyWords('>').setSingleWord('*');
private static final String SCHEME_SEPARATOR = "://";
private static final String QUEUE_SCHEME = "queue";
private static final String TOPIC_SCHEME = "topic";
private static final String CONSUMER_PREFIX = "Consumer.";
private static final String VIRTUAL_TOPIC_MARKER = ".VirtualTopic.";

public static final WildcardConfiguration OPENWIRE_WILDCARD = new WildcardConfiguration().setDelimiter('.')
.setAnyWords('>')
.setSingleWord('*');

public static final String SELECTOR_AWARE_OPTION = "selectorAware";

public static String extractFilterStringOrNull(final ConsumerInfo info, final ActiveMQDestination openWireDest) {
if (info.getSelector() != null) {
if (openWireDest.getOptions() != null) {
if (openWireDest.getOptions() != null) {
if (Boolean.valueOf(openWireDest.getOptions().get(SELECTOR_AWARE_OPTION))) {
return info.getSelector();
}
Expand Down Expand Up @@ -70,4 +84,116 @@ public static XidImpl toXID(TransactionId xaXid) {
public static XidImpl toXID(XATransactionId xaXid) {
return new XidImpl(xaXid.getBranchQualifier(), xaXid.getFormatId(), xaXid.getGlobalTransactionId());
}

/**
* Converts an ActiveMQ destination to a core produce address.
* This method handles different types of destinations:
* - Returns the physical name for temporary or advisory destinations
* - For FQQN addresses, escapes backslashes in the address part only
* - For other addresses, returns the stripped and escaped address
*
* @param destination the ActiveMQ destination to convert
* @return the core produce address, or null if the destination is null
*/
public static String toCoreProduceAddress(ActiveMQDestination destination) {
if (destination == null) {
return null;
}

if (destination.isTemporary() || AdvisorySupport.isAdvisoryTopic(destination)) {
return destination.getPhysicalName();
}

final String strippedAddress = stripAddressScheme(destination.getPhysicalName(), destination);

if (isFqqn(strippedAddress)) {
return handleFqqn(strippedAddress, OpenWireUtil::escapeBackslashes);
}

return escapeBackslashes(strippedAddress);
}

/**
* Converts an ActiveMQ destination to a core consume pattern.
* This method handles different types of destinations:
* - Returns the physical name for temporary or advisory destinations
* - For FQQN addresses, converts wildcards in the address part only
* - For virtual topic consumer queues, returns the stripped address
* - For other addresses, returns the wildcard-converted pattern
*
* @param destination the ActiveMQ destination to convert
* @param server the ActiveMQ server for wildcard configuration
* @return the core consume pattern, or null if the destination is null
*/
public static String toCoreConsumePattern(ActiveMQDestination destination, ActiveMQServer server) {
if (destination == null) {
return null;
}

if (destination.isTemporary() || AdvisorySupport.isAdvisoryTopic(destination)) {
return destination.getPhysicalName();
}

String strippedAddress = stripAddressScheme(destination.getPhysicalName(), destination);

if (isFqqn(strippedAddress)) {
return handleFqqn(strippedAddress, addr -> convertWildcard(addr, server));
}

if (destination.isQueue() && isVirtualTopicConsumerName(strippedAddress)) {
return strippedAddress;
}

return convertWildcard(strippedAddress, server);
}

private static String stripAddressScheme(String address, ActiveMQDestination destination) {
if (address == null || !address.contains(SCHEME_SEPARATOR)) {
return address;
}

int schemeIndex = address.indexOf(SCHEME_SEPARATOR);
String scheme = address.substring(0, schemeIndex).toLowerCase();

if (destination == null || (destination.isQueue() && QUEUE_SCHEME.equals(scheme)) || (destination.isTopic() && TOPIC_SCHEME.equals(scheme))) {
return address.substring(schemeIndex + SCHEME_SEPARATOR.length());
}

return address;
}

private static boolean isVirtualTopicConsumerName(String address) {
if (address == null || isFqqn(address)) {
return false;
}

String normalizedAddr = stripAddressScheme(address, null);
return normalizedAddr.startsWith(CONSUMER_PREFIX) && normalizedAddr.contains(VIRTUAL_TOPIC_MARKER);
}

private static String handleFqqn(String address, Function<String, String> addressConverter) {
SimpleString fullAddress = SimpleString.of(address);
// Extract address and queue parts from the FQQN
SimpleString addr = CompositeAddress.extractAddressName(fullAddress);
SimpleString queue = CompositeAddress.extractQueueName(fullAddress);
// Apply the converter function to the address part
String convertedAddr = addressConverter.apply(addr.toString());
// Reconstruct the FQQN with the converted address
return CompositeAddress.toFullyQualified(SimpleString.of(convertedAddr), queue).toString();
}

private static String escapeBackslashes(String input) {
return input == null ? null : input.replace("\\", "\\\\");
}

private static String convertWildcard(String address, ActiveMQServer server) {
return OPENWIRE_WILDCARD.convert(address, server.getConfiguration().getWildcardConfiguration());
}

private static boolean isFqqn(String address) {
if (address != null) {
return CompositeAddress.isFullyQualified(SimpleString.of(address));
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,10 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.ScheduledExecutorService;

import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
Expand All @@ -39,7 +34,6 @@
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
Expand All @@ -49,6 +43,13 @@
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

import java.util.concurrent.ScheduledExecutorService;

import static org.apache.activemq.artemis.core.config.WildcardConfiguration.DEFAULT_WILDCARD_CONFIGURATION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class AMQConsumerTest {
final OpenWireFormatFactory formatFactory = new OpenWireFormatFactory();
final WireFormat openWireFormat = formatFactory.createWireFormat();
Expand Down Expand Up @@ -87,7 +88,10 @@ private AMQConsumer getConsumer(int prefetchSize) throws Exception {
UUID nodeId = UUIDGenerator.getInstance().generateUUID();
ActiveMQServer coreServer = Mockito.mock(ActiveMQServer.class);
NodeManager nodeManager = Mockito.mock(NodeManager.class);
Configuration configuration = Mockito.mock(Configuration.class);
Mockito.when(configuration.getWildcardConfiguration()).thenReturn(DEFAULT_WILDCARD_CONFIGURATION);
Mockito.when(coreServer.getNodeManager()).thenReturn(nodeManager);
Mockito.when(coreServer.getConfiguration()).thenReturn(configuration);
Mockito.when(nodeManager.getUUID()).thenReturn(nodeId);

ServerSession coreSession = Mockito.mock(ServerSession.class);
Expand All @@ -100,7 +104,6 @@ private AMQConsumer getConsumer(int prefetchSize) throws Exception {
Mockito.when(session.getConnection()).thenReturn(Mockito.mock(OpenWireConnection.class));
Mockito.when(session.getCoreServer()).thenReturn(coreServer);
Mockito.when(session.getCoreSession()).thenReturn(coreSession);
Mockito.when(session.convertWildcard(ArgumentMatchers.any(ActiveMQDestination.class))).thenReturn("");

ConsumerInfo info = new ConsumerInfo();
info.setPrefetchSize(prefetchSize);
Expand Down
Loading