Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,16 +260,21 @@ protected List<Subscription> addSubscriptionsForDestination(ConnectionContext co
}

@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
throws Exception {

public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
// No timeout.. then try to shut down right way, fails if there are
// current subscribers.
if (timeout == 0) {
for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
Subscription sub = iter.next();
if (sub.matches(destination) ) {
throw new JMSException("Destination: " + destination + " still has an active subscription: " + sub);
if(sub.isWildcard()) {
var dest = destinations.get(destination);
if(dest != null && dest.isGcWithOnlyWildcardConsumers()) {
continue;
}
} else {
throw new JMSException("Destination: " + destination + " still has an active subscription: " + sub);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public abstract class BaseDestination implements Destination {
private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
private boolean gcIfInactive;
private boolean gcWithNetworkConsumers;
private long lastActiveTime=0l;
private boolean gcWithOnlyWildcardConsumers;
private long lastActiveTime = 0L;
private boolean reduceMemoryFootprint = false;
protected final Scheduler scheduler;
private boolean disposed = false;
Expand Down Expand Up @@ -311,12 +312,24 @@ public final MessageStore getMessageStore() {

@Override
public boolean isActive() {
boolean isActive = destinationStatistics.getConsumers().getCount() > 0 ||
destinationStatistics.getProducers().getCount() > 0;
if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() > 0) {
isActive = hasRegularConsumers(getConsumers());
if (destinationStatistics.getProducers().getCount() > 0) {
return true;
}
return isActive;

var destinationActive = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not very efficient because it may need to iterate through the consumers more than once. The way this is written, it may iterate through the consumers twice if both isGcWithNetworkConsumers() and isGcWithOnlyWildcardConsumers() is true.

What would make more sense is to only iterate through the consumers one time and use the predicate API to specify what you are looking for. You could build a predicate that look for either a network consumer, or not a wildcard consumer or some combination quite easily using https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/function/Predicate.html . This would also make it easy to expand the criteria in the future for other conditions.

If all the flags are off then we wouldn't need to iterate through the consumers at all.

if (destinationStatistics.getConsumers().getCount() > 0) {
if (isGcWithNetworkConsumers()) {
destinationActive = hasRegularConsumers(getConsumers());
}

if (destinationActive &&
isGcWithOnlyWildcardConsumers()) {
destinationActive = !getConsumers().stream().allMatch(Subscription::isWildcard);
}
} else {
destinationActive = false;
}
return destinationActive;
}

@Override
Expand Down Expand Up @@ -824,19 +837,37 @@ public boolean isGcWithNetworkConsumers() {
return gcWithNetworkConsumers;
}

/**
* Indicate if it is ok to gc destinations that have only wildcard consumers
* @param gcWithOnlyWildcardConsumers
*/
public void setGcWithOnlyWildcardConsumers(boolean gcWithOnlyWildcardConsumers) {
this.gcWithOnlyWildcardConsumers = gcWithOnlyWildcardConsumers;
}

public boolean isGcWithOnlyWildcardConsumers() {
return gcWithOnlyWildcardConsumers;
}

@Override
public void markForGC(long timeStamp) {
if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
&& destinationStatistics.getMessages().getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) {
if (isGcIfInactive()
&& this.lastActiveTime == 0
&& destinationStatistics.getMessages().getCount() == 0
&& getInactiveTimeoutBeforeGC() > 0L
&& !isActive()) {
this.lastActiveTime = timeStamp;
}
}

@Override
public boolean canGC() {
boolean result = false;
final long currentLastActiveTime = this.lastActiveTime;
if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.getMessages().getCount() == 0L ) {
var result = false;
final var currentLastActiveTime = this.lastActiveTime;
if (isGcIfInactive()
&& currentLastActiveTime != 0L
&& destinationStatistics.getMessages().getCount() == 0L
&& !isActive()) {
if ((System.currentTimeMillis() - currentLastActiveTime) >= getInactiveTimeoutBeforeGC()) {
result = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,5 @@ public interface Destination extends Service, Task, Message.MessageDestination {

void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled);

boolean isGcWithOnlyWildcardConsumers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,11 @@ public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatistic
next.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
}

@Override
public boolean isGcWithOnlyWildcardConsumers() {
return next.isGcWithOnlyWildcardConsumers();
}

public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
if (next instanceof DestinationFilter) {
DestinationFilter filter = (DestinationFilter) next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*
*
*/
public class TempQueue extends Queue{
public class TempQueue extends Queue {
private static final Logger LOG = LoggerFactory.getLogger(TempQueue.class);
private final ActiveMQTempDestination tempDest;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des

super.removeDestination(context, destination, timeout);
}

/*
* For a Queue, dispatch order is imperative to match acks, so the dispatch is deferred till
* the notification to ensure that the subscription chosen by the master is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class PolicyEntry extends DestinationMapEntry {
private boolean prioritizedMessages;
private boolean allConsumersExclusiveByDefault;
private boolean gcInactiveDestinations;
private boolean gcWithOnlyWildcardConsumers;
private boolean gcWithNetworkConsumers;
private long inactiveTimeoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
private boolean reduceMemoryFootprint;
Expand Down Expand Up @@ -263,6 +264,9 @@ public void baseUpdate(BaseDestination destination, Set<String> includedProperti
if (isUpdate("gcInactiveDestinations", includedProperties)) {
destination.setGcIfInactive(isGcInactiveDestinations());
}
if (isUpdate("gcWithOnlyWildcardConsumers", includedProperties)) {
destination.setGcWithOnlyWildcardConsumers(isGcWithOnlyWildcardConsumers());
}
if (isUpdate("gcWithNetworkConsumers", includedProperties)) {
destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
}
Expand Down Expand Up @@ -1082,6 +1086,14 @@ public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) {
this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC;
}

public void setGcWithOnlyWildcardConsumers(boolean gcWithOnlyWildcardConsumers) {
this.gcWithOnlyWildcardConsumers = gcWithOnlyWildcardConsumers;
}

public boolean isGcWithOnlyWildcardConsumers() {
return gcWithOnlyWildcardConsumers;
}

public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
this.gcWithNetworkConsumers = gcWithNetworkConsumers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ private void testAllQueuePropertiesAppliedFilter(Set<String> properties) throws

//initial config
setAllDestPolicyProperties(entry, true, true, 10,
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true,
30, true, true, true, true, true, true, true, true, true);
setAllQueuePolicyProperties(entry, 10000, true, true, true, true, 100,
100, true, true);
Expand All @@ -675,15 +675,15 @@ private void testAllQueuePropertiesAppliedFilter(Set<String> properties) throws

//validate config
assertAllDestPolicyProperties(getQueue("Before"), true, true, 10,
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true,
30, true, true, true,true, true, true, true, true, true);
assertAllQueuePolicyProperties(getQueue("Before"), 10000, true, true, true, true, 100,
100, true, true);


//change config
setAllDestPolicyProperties(entry, false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
300, false, false, false,false, false, false, false, false, false);
setAllQueuePolicyProperties(entry, 100000, false, false, false, false, 1000,
1000, false, false);
Expand All @@ -692,14 +692,14 @@ private void testAllQueuePropertiesAppliedFilter(Set<String> properties) throws
TimeUnit.SECONDS.sleep(SLEEP);

assertAllDestPolicyProperties(getQueue("Before"), false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
300, false, false, false,false, false, false, false, false, false);
assertAllQueuePolicyProperties(getQueue("Before"), 100000, false, false, false, false, 1000,
1000, false, false);

//check new dest
assertAllDestPolicyProperties(getQueue("After"), false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false, false);
assertAllQueuePolicyProperties(getQueue("After"), 100000, false, false, false, false, 1000,
1000, false, false);
Expand All @@ -713,7 +713,7 @@ private void testAllTopicPropertiesAppliedFilter(Set<String> properties) throws

//initial config
setAllDestPolicyProperties(entry, true, true, 10,
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true,
30, true, true, true, true, true, true, true, true, true);
setAllTopicPolicyProperties(entry, 10000, true);

Expand All @@ -725,28 +725,28 @@ private void testAllTopicPropertiesAppliedFilter(Set<String> properties) throws

//validate config
assertAllDestPolicyProperties(getTopic("Before"), true, true, 10,
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true,
30, true, true, true, true, true, true, true, true, true);
assertAllTopicPolicyProperties(getTopic("Before"), 10000, true);


//change config
setAllDestPolicyProperties(entry, false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false, false);
setAllTopicPolicyProperties(entry, 100000, false);

javaConfigBroker.modifyPolicyEntry(entry, false, properties);
TimeUnit.SECONDS.sleep(SLEEP);

assertAllDestPolicyProperties(getTopic("Before"), false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false, false);
assertAllTopicPolicyProperties(getTopic("Before"), 100000, false);

//check new dest
assertAllDestPolicyProperties(getTopic("After"), false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false, false);
assertAllTopicPolicyProperties(getTopic("After"), 100000, false);
}
Expand Down Expand Up @@ -820,6 +820,7 @@ private Set<String> getDestPropertySet() {
properties.add("cursorMemoryHighWaterMark");
properties.add("storeUsageHighWaterMark");
properties.add("gcInactiveDestinations");
properties.add("gcWithOnlyWildcardConsumers");
properties.add("gcWithNetworkConsumers");
properties.add("inactiveTimeoutBeforeGC");
properties.add("reduceMemoryFootprint");
Expand Down Expand Up @@ -862,12 +863,12 @@ private void setAllTopicPolicyProperties(PolicyEntry entry, long memoryLimit, bo
private void setAllDestPolicyProperties(PolicyEntry entry, boolean producerFlowControl,
boolean alwaysRetroactive, long blockedProducerWarningInterval, int maxPageSize,
int maxBrowsePageSize, long minimumMessageSize, int maxExpirePageSize, int cursorMemoryHighWaterMark,
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers,
long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore,
int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery,
boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers,
boolean advisoryForFastProducers, boolean advisoryWhenFull, boolean includeBodyForAdvisory,
boolean sendAdvisoryIfNoConsumers) {
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithOnlyWildcardConsumers,
boolean gcWithNetworkConsumers, long inactiveTimeoutBeforeGC, boolean reduceMemoryFootprint,
boolean doOptimizeMessageStore, int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed,
boolean advisoryForDelivery, boolean advisoryForDispatched, boolean advisoryForDiscardingMessages,
boolean advisoryForSlowConsumers, boolean advisoryForFastProducers, boolean advisoryWhenFull,
boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) {

entry.setProducerFlowControl(producerFlowControl);
entry.setAlwaysRetroactive(alwaysRetroactive);
Expand All @@ -879,6 +880,7 @@ private void setAllDestPolicyProperties(PolicyEntry entry, boolean producerFlowC
entry.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
entry.setStoreUsageHighWaterMark(storeUsageHighWaterMark);
entry.setGcInactiveDestinations(gcInactiveDestinations);
entry.setGcWithOnlyWildcardConsumers(gcWithOnlyWildcardConsumers);
entry.setGcWithNetworkConsumers(gcWithNetworkConsumers);
entry.setInactiveTimeoutBeforeGC(inactiveTimeoutBeforeGC);
entry.setReduceMemoryFootprint(reduceMemoryFootprint);
Expand Down Expand Up @@ -920,13 +922,12 @@ private void assertAllTopicPolicyProperties(Topic topic, long memoryLimit, boole
private void assertAllDestPolicyProperties(BaseDestination dest, boolean producerFlowControl,
boolean alwaysRetroactive, long blockedProducerWarningInterval, int maxPageSize,
int maxBrowsePageSize, long minimumMessageSize, int maxExpirePageSize, int cursorMemoryHighWaterMark,
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers,
long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore,
int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery,
boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers,
boolean advisoryForFastProducers, boolean advisoryWhenFull, boolean includeBodyForAdvisory,
boolean sendAdvisoryIfNoConsumers) {

int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithOnlyWildcardConsumers,
boolean gcWithNetworkConsumers, long inactiveTimeoutBeforeGC, boolean reduceMemoryFootprint,
boolean doOptimizeMessageStore, int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed,
boolean advisoryForDelivery, boolean advisoryForDispatched, boolean advisoryForDiscardingMessages,
boolean advisoryForSlowConsumers, boolean advisoryForFastProducers, boolean advisoryWhenFull,
boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) {

assertEquals(producerFlowControl, dest.isProducerFlowControl());
assertEquals(alwaysRetroactive, dest.isAlwaysRetroactive());
Expand All @@ -938,6 +939,7 @@ private void assertAllDestPolicyProperties(BaseDestination dest, boolean produce
assertEquals(cursorMemoryHighWaterMark, dest.getCursorMemoryHighWaterMark());
assertEquals(storeUsageHighWaterMark, dest.getStoreUsageHighWaterMark());
assertEquals(gcInactiveDestinations, dest.isGcIfInactive());
assertEquals(gcWithOnlyWildcardConsumers, dest.isGcWithOnlyWildcardConsumers());
assertEquals(gcWithNetworkConsumers, dest.isGcWithNetworkConsumers());
assertEquals(inactiveTimeoutBeforeGC, dest.getInactiveTimeoutBeforeGC());
assertEquals(reduceMemoryFootprint, dest.isReduceMemoryFootprint());
Expand Down
Loading