Skip to content

Commit d8f0981

Browse files
committed
[AMQ-9692] Support destination gc sweep of destinations with only wildcard consumers
1 parent 2eb0d66 commit d8f0981

File tree

9 files changed

+128
-40
lines changed

9 files changed

+128
-40
lines changed

activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -260,16 +260,21 @@ protected List<Subscription> addSubscriptionsForDestination(ConnectionContext co
260260
}
261261

262262
@Override
263-
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
264-
throws Exception {
265-
263+
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
266264
// No timeout.. then try to shut down right way, fails if there are
267265
// current subscribers.
268266
if (timeout == 0) {
269267
for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
270268
Subscription sub = iter.next();
271269
if (sub.matches(destination) ) {
272-
throw new JMSException("Destination: " + destination + " still has an active subscription: " + sub);
270+
if(sub.isWildcard()) {
271+
var dest = destinations.get(destination);
272+
if(dest != null && dest.isGcWithOnlyWildcardConsumers()) {
273+
continue;
274+
}
275+
} else {
276+
throw new JMSException("Destination: " + destination + " still has an active subscription: " + sub);
277+
}
273278
}
274279
}
275280
}

activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ public abstract class BaseDestination implements Destination {
105105
private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
106106
private boolean gcIfInactive;
107107
private boolean gcWithNetworkConsumers;
108-
private long lastActiveTime=0l;
108+
private boolean gcWithOnlyWildcardConsumers;
109+
private long lastActiveTime = 0L;
109110
private boolean reduceMemoryFootprint = false;
110111
protected final Scheduler scheduler;
111112
private boolean disposed = false;
@@ -311,12 +312,24 @@ public final MessageStore getMessageStore() {
311312

312313
@Override
313314
public boolean isActive() {
314-
boolean isActive = destinationStatistics.getConsumers().getCount() > 0 ||
315-
destinationStatistics.getProducers().getCount() > 0;
316-
if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() > 0) {
317-
isActive = hasRegularConsumers(getConsumers());
315+
if (destinationStatistics.getProducers().getCount() > 0) {
316+
return true;
318317
}
319-
return isActive;
318+
319+
var destinationActive = true;
320+
if (destinationStatistics.getConsumers().getCount() > 0) {
321+
if (isGcWithNetworkConsumers()) {
322+
destinationActive = hasRegularConsumers(getConsumers());
323+
}
324+
325+
if (destinationActive &&
326+
isGcWithOnlyWildcardConsumers()) {
327+
destinationActive = !getConsumers().stream().allMatch(Subscription::isWildcard);
328+
}
329+
} else {
330+
destinationActive = false;
331+
}
332+
return destinationActive;
320333
}
321334

322335
@Override
@@ -824,19 +837,37 @@ public boolean isGcWithNetworkConsumers() {
824837
return gcWithNetworkConsumers;
825838
}
826839

840+
/**
841+
* Indicate if it is ok to gc destinations that have only wildcard consumers
842+
* @param gcWithOnlyWildcardConsumers
843+
*/
844+
public void setGcWithOnlyWildcardConsumers(boolean gcWithOnlyWildcardConsumers) {
845+
this.gcWithOnlyWildcardConsumers = gcWithOnlyWildcardConsumers;
846+
}
847+
848+
public boolean isGcWithOnlyWildcardConsumers() {
849+
return gcWithOnlyWildcardConsumers;
850+
}
851+
827852
@Override
828853
public void markForGC(long timeStamp) {
829-
if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
830-
&& destinationStatistics.getMessages().getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) {
854+
if (isGcIfInactive()
855+
&& this.lastActiveTime == 0
856+
&& destinationStatistics.getMessages().getCount() == 0
857+
&& getInactiveTimeoutBeforeGC() > 0L
858+
&& !isActive()) {
831859
this.lastActiveTime = timeStamp;
832860
}
833861
}
834862

835863
@Override
836864
public boolean canGC() {
837-
boolean result = false;
838-
final long currentLastActiveTime = this.lastActiveTime;
839-
if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.getMessages().getCount() == 0L ) {
865+
var result = false;
866+
final var currentLastActiveTime = this.lastActiveTime;
867+
if (isGcIfInactive()
868+
&& currentLastActiveTime != 0L
869+
&& destinationStatistics.getMessages().getCount() == 0L
870+
&& !isActive()) {
840871
if ((System.currentTimeMillis() - currentLastActiveTime) >= getInactiveTimeoutBeforeGC()) {
841872
result = true;
842873
}

activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,4 +267,5 @@ public interface Destination extends Service, Task, Message.MessageDestination {
267267

268268
void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled);
269269

270+
boolean isGcWithOnlyWildcardConsumers();
270271
}

activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,11 @@ public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatistic
429429
next.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
430430
}
431431

432+
@Override
433+
public boolean isGcWithOnlyWildcardConsumers() {
434+
return next.isGcWithOnlyWildcardConsumers();
435+
}
436+
432437
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
433438
if (next instanceof DestinationFilter) {
434439
DestinationFilter filter = (DestinationFilter) next;

activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
*
3434
*
3535
*/
36-
public class TempQueue extends Queue{
36+
public class TempQueue extends Queue {
3737
private static final Logger LOG = LoggerFactory.getLogger(TempQueue.class);
3838
private final ActiveMQTempDestination tempDest;
3939

activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des
5858

5959
super.removeDestination(context, destination, timeout);
6060
}
61-
61+
6262
/*
6363
* For a Queue, dispatch order is imperative to match acks, so the dispatch is deferred till
6464
* the notification to ensure that the subscription chosen by the master is used.

activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ public class PolicyEntry extends DestinationMapEntry {
9999
private boolean prioritizedMessages;
100100
private boolean allConsumersExclusiveByDefault;
101101
private boolean gcInactiveDestinations;
102+
private boolean gcWithOnlyWildcardConsumers;
102103
private boolean gcWithNetworkConsumers;
103104
private long inactiveTimeoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
104105
private boolean reduceMemoryFootprint;
@@ -263,6 +264,9 @@ public void baseUpdate(BaseDestination destination, Set<String> includedProperti
263264
if (isUpdate("gcInactiveDestinations", includedProperties)) {
264265
destination.setGcIfInactive(isGcInactiveDestinations());
265266
}
267+
if (isUpdate("gcWithOnlyWildcardConsumers", includedProperties)) {
268+
destination.setGcWithOnlyWildcardConsumers(isGcWithOnlyWildcardConsumers());
269+
}
266270
if (isUpdate("gcWithNetworkConsumers", includedProperties)) {
267271
destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
268272
}
@@ -1082,6 +1086,14 @@ public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) {
10821086
this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC;
10831087
}
10841088

1089+
public void setGcWithOnlyWildcardConsumers(boolean gcWithOnlyWildcardConsumers) {
1090+
this.gcWithOnlyWildcardConsumers = gcWithOnlyWildcardConsumers;
1091+
}
1092+
1093+
public boolean isGcWithOnlyWildcardConsumers() {
1094+
return gcWithOnlyWildcardConsumers;
1095+
}
1096+
10851097
public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
10861098
this.gcWithNetworkConsumers = gcWithNetworkConsumers;
10871099
}

activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,7 @@ private void testAllQueuePropertiesAppliedFilter(Set<String> properties) throws
662662

663663
//initial config
664664
setAllDestPolicyProperties(entry, true, true, 10,
665-
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
665+
100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true,
666666
30, true, true, true, true, true, true, true, true, true);
667667
setAllQueuePolicyProperties(entry, 10000, true, true, true, true, 100,
668668
100, true, true);
@@ -675,15 +675,15 @@ private void testAllQueuePropertiesAppliedFilter(Set<String> properties) throws
675675

676676
//validate config
677677
assertAllDestPolicyProperties(getQueue("Before"), true, true, 10,
678-
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
678+
100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true,
679679
30, true, true, true,true, true, true, true, true, true);
680680
assertAllQueuePolicyProperties(getQueue("Before"), 10000, true, true, true, true, 100,
681681
100, true, true);
682682

683683

684684
//change config
685685
setAllDestPolicyProperties(entry, false, false, 100,
686-
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
686+
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
687687
300, false, false, false,false, false, false, false, false, false);
688688
setAllQueuePolicyProperties(entry, 100000, false, false, false, false, 1000,
689689
1000, false, false);
@@ -692,14 +692,14 @@ private void testAllQueuePropertiesAppliedFilter(Set<String> properties) throws
692692
TimeUnit.SECONDS.sleep(SLEEP);
693693

694694
assertAllDestPolicyProperties(getQueue("Before"), false, false, 100,
695-
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
695+
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
696696
300, false, false, false,false, false, false, false, false, false);
697697
assertAllQueuePolicyProperties(getQueue("Before"), 100000, false, false, false, false, 1000,
698698
1000, false, false);
699699

700700
//check new dest
701701
assertAllDestPolicyProperties(getQueue("After"), false, false, 100,
702-
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
702+
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
703703
300, false, false, false, false, false, false, false, false, false);
704704
assertAllQueuePolicyProperties(getQueue("After"), 100000, false, false, false, false, 1000,
705705
1000, false, false);
@@ -713,7 +713,7 @@ private void testAllTopicPropertiesAppliedFilter(Set<String> properties) throws
713713

714714
//initial config
715715
setAllDestPolicyProperties(entry, true, true, 10,
716-
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
716+
100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true,
717717
30, true, true, true, true, true, true, true, true, true);
718718
setAllTopicPolicyProperties(entry, 10000, true);
719719

@@ -725,28 +725,28 @@ private void testAllTopicPropertiesAppliedFilter(Set<String> properties) throws
725725

726726
//validate config
727727
assertAllDestPolicyProperties(getTopic("Before"), true, true, 10,
728-
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
728+
100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true,
729729
30, true, true, true, true, true, true, true, true, true);
730730
assertAllTopicPolicyProperties(getTopic("Before"), 10000, true);
731731

732732

733733
//change config
734734
setAllDestPolicyProperties(entry, false, false, 100,
735-
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
735+
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
736736
300, false, false, false, false, false, false, false, false, false);
737737
setAllTopicPolicyProperties(entry, 100000, false);
738738

739739
javaConfigBroker.modifyPolicyEntry(entry, false, properties);
740740
TimeUnit.SECONDS.sleep(SLEEP);
741741

742742
assertAllDestPolicyProperties(getTopic("Before"), false, false, 100,
743-
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
743+
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
744744
300, false, false, false, false, false, false, false, false, false);
745745
assertAllTopicPolicyProperties(getTopic("Before"), 100000, false);
746746

747747
//check new dest
748748
assertAllDestPolicyProperties(getTopic("After"), false, false, 100,
749-
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
749+
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
750750
300, false, false, false, false, false, false, false, false, false);
751751
assertAllTopicPolicyProperties(getTopic("After"), 100000, false);
752752
}
@@ -820,6 +820,7 @@ private Set<String> getDestPropertySet() {
820820
properties.add("cursorMemoryHighWaterMark");
821821
properties.add("storeUsageHighWaterMark");
822822
properties.add("gcInactiveDestinations");
823+
properties.add("gcWithOnlyWildcardConsumers");
823824
properties.add("gcWithNetworkConsumers");
824825
properties.add("inactiveTimeoutBeforeGC");
825826
properties.add("reduceMemoryFootprint");
@@ -862,12 +863,12 @@ private void setAllTopicPolicyProperties(PolicyEntry entry, long memoryLimit, bo
862863
private void setAllDestPolicyProperties(PolicyEntry entry, boolean producerFlowControl,
863864
boolean alwaysRetroactive, long blockedProducerWarningInterval, int maxPageSize,
864865
int maxBrowsePageSize, long minimumMessageSize, int maxExpirePageSize, int cursorMemoryHighWaterMark,
865-
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers,
866-
long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore,
867-
int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery,
868-
boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers,
869-
boolean advisoryForFastProducers, boolean advisoryWhenFull, boolean includeBodyForAdvisory,
870-
boolean sendAdvisoryIfNoConsumers) {
866+
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithOnlyWildcardConsumers,
867+
boolean gcWithNetworkConsumers, long inactiveTimeoutBeforeGC, boolean reduceMemoryFootprint,
868+
boolean doOptimizeMessageStore, int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed,
869+
boolean advisoryForDelivery, boolean advisoryForDispatched, boolean advisoryForDiscardingMessages,
870+
boolean advisoryForSlowConsumers, boolean advisoryForFastProducers, boolean advisoryWhenFull,
871+
boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) {
871872

872873
entry.setProducerFlowControl(producerFlowControl);
873874
entry.setAlwaysRetroactive(alwaysRetroactive);
@@ -879,6 +880,7 @@ private void setAllDestPolicyProperties(PolicyEntry entry, boolean producerFlowC
879880
entry.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
880881
entry.setStoreUsageHighWaterMark(storeUsageHighWaterMark);
881882
entry.setGcInactiveDestinations(gcInactiveDestinations);
883+
entry.setGcWithOnlyWildcardConsumers(gcWithOnlyWildcardConsumers);
882884
entry.setGcWithNetworkConsumers(gcWithNetworkConsumers);
883885
entry.setInactiveTimeoutBeforeGC(inactiveTimeoutBeforeGC);
884886
entry.setReduceMemoryFootprint(reduceMemoryFootprint);
@@ -920,13 +922,12 @@ private void assertAllTopicPolicyProperties(Topic topic, long memoryLimit, boole
920922
private void assertAllDestPolicyProperties(BaseDestination dest, boolean producerFlowControl,
921923
boolean alwaysRetroactive, long blockedProducerWarningInterval, int maxPageSize,
922924
int maxBrowsePageSize, long minimumMessageSize, int maxExpirePageSize, int cursorMemoryHighWaterMark,
923-
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers,
924-
long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore,
925-
int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery,
926-
boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers,
927-
boolean advisoryForFastProducers, boolean advisoryWhenFull, boolean includeBodyForAdvisory,
928-
boolean sendAdvisoryIfNoConsumers) {
929-
925+
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithOnlyWildcardConsumers,
926+
boolean gcWithNetworkConsumers, long inactiveTimeoutBeforeGC, boolean reduceMemoryFootprint,
927+
boolean doOptimizeMessageStore, int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed,
928+
boolean advisoryForDelivery, boolean advisoryForDispatched, boolean advisoryForDiscardingMessages,
929+
boolean advisoryForSlowConsumers, boolean advisoryForFastProducers, boolean advisoryWhenFull,
930+
boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) {
930931

931932
assertEquals(producerFlowControl, dest.isProducerFlowControl());
932933
assertEquals(alwaysRetroactive, dest.isAlwaysRetroactive());
@@ -938,6 +939,7 @@ private void assertAllDestPolicyProperties(BaseDestination dest, boolean produce
938939
assertEquals(cursorMemoryHighWaterMark, dest.getCursorMemoryHighWaterMark());
939940
assertEquals(storeUsageHighWaterMark, dest.getStoreUsageHighWaterMark());
940941
assertEquals(gcInactiveDestinations, dest.isGcIfInactive());
942+
assertEquals(gcWithOnlyWildcardConsumers, dest.isGcWithOnlyWildcardConsumers());
941943
assertEquals(gcWithNetworkConsumers, dest.isGcWithNetworkConsumers());
942944
assertEquals(inactiveTimeoutBeforeGC, dest.getInactiveTimeoutBeforeGC());
943945
assertEquals(reduceMemoryFootprint, dest.isReduceMemoryFootprint());

0 commit comments

Comments
 (0)