From d8f0981068a452167ce305ce1ce36f589d57ed9b Mon Sep 17 00:00:00 2001 From: Matt Pavlovich Date: Tue, 26 Aug 2025 09:39:06 -0500 Subject: [PATCH] [AMQ-9692] Support destination gc sweep of destinations with only wildcard consumers --- .../broker/region/AbstractRegion.java | 13 +++-- .../broker/region/BaseDestination.java | 53 +++++++++++++++---- .../activemq/broker/region/Destination.java | 1 + .../broker/region/DestinationFilter.java | 5 ++ .../activemq/broker/region/TempQueue.java | 2 +- .../broker/region/TempQueueRegion.java | 2 +- .../broker/region/policy/PolicyEntry.java | 12 +++++ .../activemq/java/JavaPolicyEntryTest.java | 48 +++++++++-------- .../broker/region/DestinationGCTest.java | 32 +++++++++++ 9 files changed, 128 insertions(+), 40 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index af77b1d4498..90acbe8d416 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -260,16 +260,21 @@ protected List addSubscriptionsForDestination(ConnectionContext co } @Override - public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) - throws Exception { - + public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { // No timeout.. then try to shut down right way, fails if there are // current subscribers. if (timeout == 0) { for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) { Subscription sub = iter.next(); if (sub.matches(destination) ) { - throw new JMSException("Destination: " + destination + " still has an active subscription: " + sub); + if(sub.isWildcard()) { + var dest = destinations.get(destination); + if(dest != null && dest.isGcWithOnlyWildcardConsumers()) { + continue; + } + } else { + throw new JMSException("Destination: " + destination + " still has an active subscription: " + sub); + } } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 82ed8784a26..074ad2d866e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -105,7 +105,8 @@ public abstract class BaseDestination implements Destination { private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; private boolean gcIfInactive; private boolean gcWithNetworkConsumers; - private long lastActiveTime=0l; + private boolean gcWithOnlyWildcardConsumers; + private long lastActiveTime = 0L; private boolean reduceMemoryFootprint = false; protected final Scheduler scheduler; private boolean disposed = false; @@ -311,12 +312,24 @@ public final MessageStore getMessageStore() { @Override public boolean isActive() { - boolean isActive = destinationStatistics.getConsumers().getCount() > 0 || - destinationStatistics.getProducers().getCount() > 0; - if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() > 0) { - isActive = hasRegularConsumers(getConsumers()); + if (destinationStatistics.getProducers().getCount() > 0) { + return true; } - return isActive; + + var destinationActive = true; + if (destinationStatistics.getConsumers().getCount() > 0) { + if (isGcWithNetworkConsumers()) { + destinationActive = hasRegularConsumers(getConsumers()); + } + + if (destinationActive && + isGcWithOnlyWildcardConsumers()) { + destinationActive = !getConsumers().stream().allMatch(Subscription::isWildcard); + } + } else { + destinationActive = false; + } + return destinationActive; } @Override @@ -824,19 +837,37 @@ public boolean isGcWithNetworkConsumers() { return gcWithNetworkConsumers; } + /** + * Indicate if it is ok to gc destinations that have only wildcard consumers + * @param gcWithOnlyWildcardConsumers + */ + public void setGcWithOnlyWildcardConsumers(boolean gcWithOnlyWildcardConsumers) { + this.gcWithOnlyWildcardConsumers = gcWithOnlyWildcardConsumers; + } + + public boolean isGcWithOnlyWildcardConsumers() { + return gcWithOnlyWildcardConsumers; + } + @Override public void markForGC(long timeStamp) { - if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false - && destinationStatistics.getMessages().getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) { + if (isGcIfInactive() + && this.lastActiveTime == 0 + && destinationStatistics.getMessages().getCount() == 0 + && getInactiveTimeoutBeforeGC() > 0L + && !isActive()) { this.lastActiveTime = timeStamp; } } @Override public boolean canGC() { - boolean result = false; - final long currentLastActiveTime = this.lastActiveTime; - if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.getMessages().getCount() == 0L ) { + var result = false; + final var currentLastActiveTime = this.lastActiveTime; + if (isGcIfInactive() + && currentLastActiveTime != 0L + && destinationStatistics.getMessages().getCount() == 0L + && !isActive()) { if ((System.currentTimeMillis() - currentLastActiveTime) >= getInactiveTimeoutBeforeGC()) { result = true; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java index 22ba14894b7..2901fd0e18a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -267,4 +267,5 @@ public interface Destination extends Service, Task, Message.MessageDestination { void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled); + boolean isGcWithOnlyWildcardConsumers(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index 1ab96560ac2..9cf29d14b40 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -429,6 +429,11 @@ public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatistic next.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled); } + @Override + public boolean isGcWithOnlyWildcardConsumers() { + return next.isGcWithOnlyWildcardConsumers(); + } + public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { if (next instanceof DestinationFilter) { DestinationFilter filter = (DestinationFilter) next; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java index 17eeb056472..40dbcb4b4bc 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java @@ -33,7 +33,7 @@ * * */ -public class TempQueue extends Queue{ +public class TempQueue extends Queue { private static final Logger LOG = LoggerFactory.getLogger(TempQueue.class); private final ActiveMQTempDestination tempDest; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java index c6bf7320419..97c54b35ddd 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java @@ -58,7 +58,7 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des super.removeDestination(context, destination, timeout); } - + /* * For a Queue, dispatch order is imperative to match acks, so the dispatch is deferred till * the notification to ensure that the subscription chosen by the master is used. diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 8ae052574bc..96c236afdc6 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -99,6 +99,7 @@ public class PolicyEntry extends DestinationMapEntry { private boolean prioritizedMessages; private boolean allConsumersExclusiveByDefault; private boolean gcInactiveDestinations; + private boolean gcWithOnlyWildcardConsumers; private boolean gcWithNetworkConsumers; private long inactiveTimeoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; private boolean reduceMemoryFootprint; @@ -263,6 +264,9 @@ public void baseUpdate(BaseDestination destination, Set includedProperti if (isUpdate("gcInactiveDestinations", includedProperties)) { destination.setGcIfInactive(isGcInactiveDestinations()); } + if (isUpdate("gcWithOnlyWildcardConsumers", includedProperties)) { + destination.setGcWithOnlyWildcardConsumers(isGcWithOnlyWildcardConsumers()); + } if (isUpdate("gcWithNetworkConsumers", includedProperties)) { destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers()); } @@ -1082,6 +1086,14 @@ public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) { this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC; } + public void setGcWithOnlyWildcardConsumers(boolean gcWithOnlyWildcardConsumers) { + this.gcWithOnlyWildcardConsumers = gcWithOnlyWildcardConsumers; + } + + public boolean isGcWithOnlyWildcardConsumers() { + return gcWithOnlyWildcardConsumers; + } + public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) { this.gcWithNetworkConsumers = gcWithNetworkConsumers; } diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java index bc92505d7e9..9c22ec9d98b 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java @@ -662,7 +662,7 @@ private void testAllQueuePropertiesAppliedFilter(Set properties) throws //initial config setAllDestPolicyProperties(entry, true, true, 10, - 100, 200, 1000, 400, 40, 30, true, true, 1000, true, true, + 100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true, 30, true, true, true, true, true, true, true, true, true); setAllQueuePolicyProperties(entry, 10000, true, true, true, true, 100, 100, true, true); @@ -675,7 +675,7 @@ private void testAllQueuePropertiesAppliedFilter(Set properties) throws //validate config assertAllDestPolicyProperties(getQueue("Before"), true, true, 10, - 100, 200, 1000, 400, 40, 30, true, true, 1000, true, true, + 100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true, 30, true, true, true,true, true, true, true, true, true); assertAllQueuePolicyProperties(getQueue("Before"), 10000, true, true, true, true, 100, 100, true, true); @@ -683,7 +683,7 @@ private void testAllQueuePropertiesAppliedFilter(Set properties) throws //change config setAllDestPolicyProperties(entry, false, false, 100, - 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false, 300, false, false, false,false, false, false, false, false, false); setAllQueuePolicyProperties(entry, 100000, false, false, false, false, 1000, 1000, false, false); @@ -692,14 +692,14 @@ private void testAllQueuePropertiesAppliedFilter(Set properties) throws TimeUnit.SECONDS.sleep(SLEEP); assertAllDestPolicyProperties(getQueue("Before"), false, false, 100, - 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false, 300, false, false, false,false, false, false, false, false, false); assertAllQueuePolicyProperties(getQueue("Before"), 100000, false, false, false, false, 1000, 1000, false, false); //check new dest assertAllDestPolicyProperties(getQueue("After"), false, false, 100, - 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false, 300, false, false, false, false, false, false, false, false, false); assertAllQueuePolicyProperties(getQueue("After"), 100000, false, false, false, false, 1000, 1000, false, false); @@ -713,7 +713,7 @@ private void testAllTopicPropertiesAppliedFilter(Set properties) throws //initial config setAllDestPolicyProperties(entry, true, true, 10, - 100, 200, 1000, 400, 40, 30, true, true, 1000, true, true, + 100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true, 30, true, true, true, true, true, true, true, true, true); setAllTopicPolicyProperties(entry, 10000, true); @@ -725,14 +725,14 @@ private void testAllTopicPropertiesAppliedFilter(Set properties) throws //validate config assertAllDestPolicyProperties(getTopic("Before"), true, true, 10, - 100, 200, 1000, 400, 40, 30, true, true, 1000, true, true, + 100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true, 30, true, true, true, true, true, true, true, true, true); assertAllTopicPolicyProperties(getTopic("Before"), 10000, true); //change config setAllDestPolicyProperties(entry, false, false, 100, - 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false, 300, false, false, false, false, false, false, false, false, false); setAllTopicPolicyProperties(entry, 100000, false); @@ -740,13 +740,13 @@ private void testAllTopicPropertiesAppliedFilter(Set properties) throws TimeUnit.SECONDS.sleep(SLEEP); assertAllDestPolicyProperties(getTopic("Before"), false, false, 100, - 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false, 300, false, false, false, false, false, false, false, false, false); assertAllTopicPolicyProperties(getTopic("Before"), 100000, false); //check new dest assertAllDestPolicyProperties(getTopic("After"), false, false, 100, - 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false, 300, false, false, false, false, false, false, false, false, false); assertAllTopicPolicyProperties(getTopic("After"), 100000, false); } @@ -820,6 +820,7 @@ private Set getDestPropertySet() { properties.add("cursorMemoryHighWaterMark"); properties.add("storeUsageHighWaterMark"); properties.add("gcInactiveDestinations"); + properties.add("gcWithOnlyWildcardConsumers"); properties.add("gcWithNetworkConsumers"); properties.add("inactiveTimeoutBeforeGC"); properties.add("reduceMemoryFootprint"); @@ -862,12 +863,12 @@ private void setAllTopicPolicyProperties(PolicyEntry entry, long memoryLimit, bo private void setAllDestPolicyProperties(PolicyEntry entry, boolean producerFlowControl, boolean alwaysRetroactive, long blockedProducerWarningInterval, int maxPageSize, int maxBrowsePageSize, long minimumMessageSize, int maxExpirePageSize, int cursorMemoryHighWaterMark, - int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers, - long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore, - int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery, - boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers, - boolean advisoryForFastProducers, boolean advisoryWhenFull, boolean includeBodyForAdvisory, - boolean sendAdvisoryIfNoConsumers) { + int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithOnlyWildcardConsumers, + boolean gcWithNetworkConsumers, long inactiveTimeoutBeforeGC, boolean reduceMemoryFootprint, + boolean doOptimizeMessageStore, int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, + boolean advisoryForDelivery, boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, + boolean advisoryForSlowConsumers, boolean advisoryForFastProducers, boolean advisoryWhenFull, + boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) { entry.setProducerFlowControl(producerFlowControl); entry.setAlwaysRetroactive(alwaysRetroactive); @@ -879,6 +880,7 @@ private void setAllDestPolicyProperties(PolicyEntry entry, boolean producerFlowC entry.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark); entry.setStoreUsageHighWaterMark(storeUsageHighWaterMark); entry.setGcInactiveDestinations(gcInactiveDestinations); + entry.setGcWithOnlyWildcardConsumers(gcWithOnlyWildcardConsumers); entry.setGcWithNetworkConsumers(gcWithNetworkConsumers); entry.setInactiveTimeoutBeforeGC(inactiveTimeoutBeforeGC); entry.setReduceMemoryFootprint(reduceMemoryFootprint); @@ -920,13 +922,12 @@ private void assertAllTopicPolicyProperties(Topic topic, long memoryLimit, boole private void assertAllDestPolicyProperties(BaseDestination dest, boolean producerFlowControl, boolean alwaysRetroactive, long blockedProducerWarningInterval, int maxPageSize, int maxBrowsePageSize, long minimumMessageSize, int maxExpirePageSize, int cursorMemoryHighWaterMark, - int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers, - long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore, - int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery, - boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers, - boolean advisoryForFastProducers, boolean advisoryWhenFull, boolean includeBodyForAdvisory, - boolean sendAdvisoryIfNoConsumers) { - + int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithOnlyWildcardConsumers, + boolean gcWithNetworkConsumers, long inactiveTimeoutBeforeGC, boolean reduceMemoryFootprint, + boolean doOptimizeMessageStore, int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, + boolean advisoryForDelivery, boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, + boolean advisoryForSlowConsumers, boolean advisoryForFastProducers, boolean advisoryWhenFull, + boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) { assertEquals(producerFlowControl, dest.isProducerFlowControl()); assertEquals(alwaysRetroactive, dest.isAlwaysRetroactive()); @@ -938,6 +939,7 @@ private void assertAllDestPolicyProperties(BaseDestination dest, boolean produce assertEquals(cursorMemoryHighWaterMark, dest.getCursorMemoryHighWaterMark()); assertEquals(storeUsageHighWaterMark, dest.getStoreUsageHighWaterMark()); assertEquals(gcInactiveDestinations, dest.isGcIfInactive()); + assertEquals(gcWithOnlyWildcardConsumers, dest.isGcWithOnlyWildcardConsumers()); assertEquals(gcWithNetworkConsumers, dest.isGcWithNetworkConsumers()); assertEquals(inactiveTimeoutBeforeGC, dest.getInactiveTimeoutBeforeGC()); assertEquals(reduceMemoryFootprint, dest.isReduceMemoryFootprint()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java index b728dba70b8..db8b41aab85 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java @@ -47,6 +47,8 @@ public class DestinationGCTest { private final ActiveMQQueue queue = new ActiveMQQueue("TEST"); private final ActiveMQQueue otherQueue = new ActiveMQQueue("TEST-OTHER"); + private final ActiveMQQueue wildcardQueueA = new ActiveMQQueue("TEST.FOO.A"); + private final ActiveMQQueue wildcardQueueB = new ActiveMQQueue("TEST.FOO.B"); private BrokerService brokerService; @@ -68,6 +70,7 @@ public void tearDown() throws Exception { protected BrokerService createBroker() throws Exception { PolicyEntry entry = new PolicyEntry(); entry.setGcInactiveDestinations(true); + entry.setGcWithOnlyWildcardConsumers(true); entry.setInactiveTimeoutBeforeGC(3000); PolicyMap map = new PolicyMap(); map.setDefaultEntry(entry); @@ -111,6 +114,35 @@ public boolean isSatisified() throws Exception { connection.close(); } + @Test //(timeout = 60000) + public void testDestinationGCWithOnlyWildcardConsumers() throws Exception { + assertEquals(1, brokerService.getAdminView().getQueues().length); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false"); + + try(Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createProducer(wildcardQueueA).close(); + session.createProducer(wildcardQueueB).close(); + MessageConsumer consumer = session.createConsumer(session.createQueue("TEST.FOO.*")); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + } + }); + + connection.start(); + + assertTrue("After GC runs there should be one Queue (count=" + brokerService.getAdminView().getQueues().length + ")", Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getQueues().length == 1; + } + })); + } + } + @Test(timeout = 60000) public void testDestinationGc() throws Exception { assertEquals(1, brokerService.getAdminView().getQueues().length);