Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,16 +260,21 @@ protected List<Subscription> addSubscriptionsForDestination(ConnectionContext co
}

@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
throws Exception {

public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
// No timeout.. then try to shut down right way, fails if there are
// current subscribers.
if (timeout == 0) {
for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
Subscription sub = iter.next();
if (sub.matches(destination) ) {
throw new JMSException("Destination: " + destination + " still has an active subscription: " + sub);
if(sub.isWildcard()) {
var dest = destinations.get(destination);
if(dest != null && dest.isGcWithOnlyWildcardConsumers()) {
Copy link
Contributor

@cshannon cshannon Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There still seems to be a missing test case because this logic is broken, Intellij flagged it. This continue does nothing as it's in a loop and this will not throw the exception if isGcWithOnlyWildcardConsumers() is false.

This should likely just be rewritten as something like and include a test that would catch a similar mistake in the future.

if (sub.matches(destination) ) {
    var dest = destinations.get(destination);
    if(!sub.isWildcard() || dest == null || !dest.isGcWithOnlyWildcardConsumers()) {
        throw new JMSException("Destination: " + destination + " still has an active subscription: " + sub);
    } 
}

Copy link
Contributor Author

@mattrpav mattrpav Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this also need to look for the same for network-only subscribers for consistency? Or better yet perhaps re-use the isActive() or canGC() methods?

Note: This method is invoked by the RegionBroker gc check after confirming the expiration rules apply for the given destination.

In the case of GC, this is a repeat of the check.

In the case of an admin operation (ie via CLI or JMX) this is the only check to prevent deleting a destination w/ active subscribers.

Copy link
Contributor

@cshannon cshannon Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isActive() and canGC() are a bit different because they also check for active producers. I'm not entirely sure why the method to remove was written not to care about producers, maybe an oversight, but changing that now would be a behavior change that could cause some side effects that are unexpected. Re-using logic is good but maybe it just needs to be subdivided to only check for active consumers (that method already exists and isActive() calls it now).

As you pointed out, the RegionBroker calls this to delete the destination but already after the gc check so we know there are no active consumer/producers. So the real question is whether or not we care about blocking a manual deletion through JMX if there are active producers. It seems odd to not check that but changing that now as I said is a behavior change that could break someone's use case. (I suppose i could see someone wanting to block deletes with active consumers but not producers).

Your point about it not checking the gcWithNetworkConsumers flag is valid and probably a bug. In fact it proves the point that we should be sharing the logic because the GC automated check will check it but not the manual deletion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a long list of things that are like this where we may want to revisit for AMQ 7 where we want to change default behavior or settings but it's technically a breaking change so a major version is more appropriate. I guess whether or not we change this to check active producers depends on if we consider it a bug or not.

If we change it I would think by default this would share the same logic as the isActive and GC check and that should block removal by default even by an admin through JMX/console etc to prevent mistakes. But, if we do that we could add a force flag that would allow an administrator to force delete, but all of this is kind of out scope of the work for this.

continue;
}
} else {
throw new JMSException("Destination: " + destination + " still has an active subscription: " + sub);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;

import jakarta.jms.ResourceAllocationException;

Expand Down Expand Up @@ -105,7 +106,8 @@ public abstract class BaseDestination implements Destination {
private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
private boolean gcIfInactive;
private boolean gcWithNetworkConsumers;
private long lastActiveTime=0l;
private boolean gcWithOnlyWildcardConsumers;
private long lastActiveTime = 0L;
private boolean reduceMemoryFootprint = false;
protected final Scheduler scheduler;
private boolean disposed = false;
Expand Down Expand Up @@ -311,12 +313,37 @@ public final MessageStore getMessageStore() {

@Override
public boolean isActive() {
boolean isActive = destinationStatistics.getConsumers().getCount() > 0 ||
destinationStatistics.getProducers().getCount() > 0;
if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() > 0) {
isActive = hasRegularConsumers(getConsumers());
// if we have producers then we are active
if (destinationStatistics.getProducers().getCount() > 0) {
return true;
}
return isActive;

// Check if we have active consumers that should prevent GC
if (destinationStatistics.getConsumers().getCount() > 0) {
// if we have consumers and both gcWithNetwork and gcOnlyWildcard consumers
// are false we can just return true, otherwise we need to check each consumer
return (!isGcWithNetworkConsumers() && !isGcWithOnlyWildcardConsumers()) ||
hasActiveConsumers();
}

return false;
}

protected Predicate<Subscription> canGcConsumer = subscription -> {
// if isGcWithNetworkConsumers() is true and this is a network subscription then we can GC
boolean canGcNetwork = isGcWithNetworkConsumers() && subscription.getConsumerInfo().isNetworkSubscription();
// if isGcWithOnlyWildcardConsumers() is true and this is a wildcard then we can GC
return canGcNetwork || (isGcWithOnlyWildcardConsumers() && subscription.isWildcard());
};

protected boolean hasActiveConsumers() {
final List<Subscription> consumers = getConsumers();
for (Subscription subscription: consumers) {
if (!canGcConsumer.test(subscription)) {
return true;
}
}
return false;
}

@Override
Expand Down Expand Up @@ -824,19 +851,37 @@ public boolean isGcWithNetworkConsumers() {
return gcWithNetworkConsumers;
}

/**
* Indicate if it is ok to gc destinations that have only wildcard consumers
* @param gcWithOnlyWildcardConsumers
*/
public void setGcWithOnlyWildcardConsumers(boolean gcWithOnlyWildcardConsumers) {
this.gcWithOnlyWildcardConsumers = gcWithOnlyWildcardConsumers;
}

public boolean isGcWithOnlyWildcardConsumers() {
return gcWithOnlyWildcardConsumers;
}

@Override
public void markForGC(long timeStamp) {
if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
&& destinationStatistics.getMessages().getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) {
if (isGcIfInactive()
&& this.lastActiveTime == 0
&& destinationStatistics.getMessages().getCount() == 0
&& getInactiveTimeoutBeforeGC() > 0L
&& !isActive()) {
this.lastActiveTime = timeStamp;
}
}

@Override
public boolean canGC() {
boolean result = false;
final long currentLastActiveTime = this.lastActiveTime;
if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.getMessages().getCount() == 0L ) {
var result = false;
final var currentLastActiveTime = this.lastActiveTime;
if (isGcIfInactive()
&& currentLastActiveTime != 0L
&& destinationStatistics.getMessages().getCount() == 0L
&& !isActive()) {
if ((System.currentTimeMillis() - currentLastActiveTime) >= getInactiveTimeoutBeforeGC()) {
result = true;
}
Expand Down Expand Up @@ -893,17 +938,6 @@ public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatistic
@Override
public abstract List<Subscription> getConsumers();

protected boolean hasRegularConsumers(List<Subscription> consumers) {
boolean hasRegularConsumers = false;
for (Subscription subscription: consumers) {
if (!subscription.getConsumerInfo().isNetworkSubscription()) {
hasRegularConsumers = true;
break;
}
}
return hasRegularConsumers;
}

public ConnectionContext createConnectionContext() {
ConnectionContext answer = new ConnectionContext();
answer.setBroker(this.broker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,5 @@ public interface Destination extends Service, Task, Message.MessageDestination {

void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled);

boolean isGcWithOnlyWildcardConsumers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,11 @@ public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatistic
next.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
}

@Override
public boolean isGcWithOnlyWildcardConsumers() {
return next.isGcWithOnlyWildcardConsumers();
}

public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
if (next instanceof DestinationFilter) {
DestinationFilter filter = (DestinationFilter) next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*
*
*/
public class TempQueue extends Queue{
public class TempQueue extends Queue {
private static final Logger LOG = LoggerFactory.getLogger(TempQueue.class);
private final ActiveMQTempDestination tempDest;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des

super.removeDestination(context, destination, timeout);
}

/*
* For a Queue, dispatch order is imperative to match acks, so the dispatch is deferred till
* the notification to ensure that the subscription chosen by the master is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class PolicyEntry extends DestinationMapEntry {
private boolean prioritizedMessages;
private boolean allConsumersExclusiveByDefault;
private boolean gcInactiveDestinations;
private boolean gcWithOnlyWildcardConsumers;
private boolean gcWithNetworkConsumers;
private long inactiveTimeoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
private boolean reduceMemoryFootprint;
Expand Down Expand Up @@ -263,6 +264,9 @@ public void baseUpdate(BaseDestination destination, Set<String> includedProperti
if (isUpdate("gcInactiveDestinations", includedProperties)) {
destination.setGcIfInactive(isGcInactiveDestinations());
}
if (isUpdate("gcWithOnlyWildcardConsumers", includedProperties)) {
destination.setGcWithOnlyWildcardConsumers(isGcWithOnlyWildcardConsumers());
}
if (isUpdate("gcWithNetworkConsumers", includedProperties)) {
destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
}
Expand Down Expand Up @@ -1082,6 +1086,14 @@ public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) {
this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC;
}

public void setGcWithOnlyWildcardConsumers(boolean gcWithOnlyWildcardConsumers) {
this.gcWithOnlyWildcardConsumers = gcWithOnlyWildcardConsumers;
}

public boolean isGcWithOnlyWildcardConsumers() {
return gcWithOnlyWildcardConsumers;
}

public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
this.gcWithNetworkConsumers = gcWithNetworkConsumers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ private void testAllQueuePropertiesAppliedFilter(Set<String> properties) throws

//initial config
setAllDestPolicyProperties(entry, true, true, 10,
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true,
30, true, true, true, true, true, true, true, true, true);
setAllQueuePolicyProperties(entry, 10000, true, true, true, true, 100,
100, true, true);
Expand All @@ -675,15 +675,15 @@ private void testAllQueuePropertiesAppliedFilter(Set<String> properties) throws

//validate config
assertAllDestPolicyProperties(getQueue("Before"), true, true, 10,
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true,
30, true, true, true,true, true, true, true, true, true);
assertAllQueuePolicyProperties(getQueue("Before"), 10000, true, true, true, true, 100,
100, true, true);


//change config
setAllDestPolicyProperties(entry, false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
300, false, false, false,false, false, false, false, false, false);
setAllQueuePolicyProperties(entry, 100000, false, false, false, false, 1000,
1000, false, false);
Expand All @@ -692,14 +692,14 @@ private void testAllQueuePropertiesAppliedFilter(Set<String> properties) throws
TimeUnit.SECONDS.sleep(SLEEP);

assertAllDestPolicyProperties(getQueue("Before"), false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
300, false, false, false,false, false, false, false, false, false);
assertAllQueuePolicyProperties(getQueue("Before"), 100000, false, false, false, false, 1000,
1000, false, false);

//check new dest
assertAllDestPolicyProperties(getQueue("After"), false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false, false);
assertAllQueuePolicyProperties(getQueue("After"), 100000, false, false, false, false, 1000,
1000, false, false);
Expand All @@ -713,7 +713,7 @@ private void testAllTopicPropertiesAppliedFilter(Set<String> properties) throws

//initial config
setAllDestPolicyProperties(entry, true, true, 10,
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true,
30, true, true, true, true, true, true, true, true, true);
setAllTopicPolicyProperties(entry, 10000, true);

Expand All @@ -725,28 +725,28 @@ private void testAllTopicPropertiesAppliedFilter(Set<String> properties) throws

//validate config
assertAllDestPolicyProperties(getTopic("Before"), true, true, 10,
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true,
30, true, true, true, true, true, true, true, true, true);
assertAllTopicPolicyProperties(getTopic("Before"), 10000, true);


//change config
setAllDestPolicyProperties(entry, false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false, false);
setAllTopicPolicyProperties(entry, 100000, false);

javaConfigBroker.modifyPolicyEntry(entry, false, properties);
TimeUnit.SECONDS.sleep(SLEEP);

assertAllDestPolicyProperties(getTopic("Before"), false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false, false);
assertAllTopicPolicyProperties(getTopic("Before"), 100000, false);

//check new dest
assertAllDestPolicyProperties(getTopic("After"), false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false, false);
assertAllTopicPolicyProperties(getTopic("After"), 100000, false);
}
Expand Down Expand Up @@ -820,6 +820,7 @@ private Set<String> getDestPropertySet() {
properties.add("cursorMemoryHighWaterMark");
properties.add("storeUsageHighWaterMark");
properties.add("gcInactiveDestinations");
properties.add("gcWithOnlyWildcardConsumers");
properties.add("gcWithNetworkConsumers");
properties.add("inactiveTimeoutBeforeGC");
properties.add("reduceMemoryFootprint");
Expand Down Expand Up @@ -862,12 +863,12 @@ private void setAllTopicPolicyProperties(PolicyEntry entry, long memoryLimit, bo
private void setAllDestPolicyProperties(PolicyEntry entry, boolean producerFlowControl,
boolean alwaysRetroactive, long blockedProducerWarningInterval, int maxPageSize,
int maxBrowsePageSize, long minimumMessageSize, int maxExpirePageSize, int cursorMemoryHighWaterMark,
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers,
long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore,
int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery,
boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers,
boolean advisoryForFastProducers, boolean advisoryWhenFull, boolean includeBodyForAdvisory,
boolean sendAdvisoryIfNoConsumers) {
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithOnlyWildcardConsumers,
boolean gcWithNetworkConsumers, long inactiveTimeoutBeforeGC, boolean reduceMemoryFootprint,
boolean doOptimizeMessageStore, int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed,
boolean advisoryForDelivery, boolean advisoryForDispatched, boolean advisoryForDiscardingMessages,
boolean advisoryForSlowConsumers, boolean advisoryForFastProducers, boolean advisoryWhenFull,
boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) {

entry.setProducerFlowControl(producerFlowControl);
entry.setAlwaysRetroactive(alwaysRetroactive);
Expand All @@ -879,6 +880,7 @@ private void setAllDestPolicyProperties(PolicyEntry entry, boolean producerFlowC
entry.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
entry.setStoreUsageHighWaterMark(storeUsageHighWaterMark);
entry.setGcInactiveDestinations(gcInactiveDestinations);
entry.setGcWithOnlyWildcardConsumers(gcWithOnlyWildcardConsumers);
entry.setGcWithNetworkConsumers(gcWithNetworkConsumers);
entry.setInactiveTimeoutBeforeGC(inactiveTimeoutBeforeGC);
entry.setReduceMemoryFootprint(reduceMemoryFootprint);
Expand Down Expand Up @@ -920,13 +922,12 @@ private void assertAllTopicPolicyProperties(Topic topic, long memoryLimit, boole
private void assertAllDestPolicyProperties(BaseDestination dest, boolean producerFlowControl,
boolean alwaysRetroactive, long blockedProducerWarningInterval, int maxPageSize,
int maxBrowsePageSize, long minimumMessageSize, int maxExpirePageSize, int cursorMemoryHighWaterMark,
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers,
long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore,
int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery,
boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers,
boolean advisoryForFastProducers, boolean advisoryWhenFull, boolean includeBodyForAdvisory,
boolean sendAdvisoryIfNoConsumers) {

int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithOnlyWildcardConsumers,
boolean gcWithNetworkConsumers, long inactiveTimeoutBeforeGC, boolean reduceMemoryFootprint,
boolean doOptimizeMessageStore, int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed,
boolean advisoryForDelivery, boolean advisoryForDispatched, boolean advisoryForDiscardingMessages,
boolean advisoryForSlowConsumers, boolean advisoryForFastProducers, boolean advisoryWhenFull,
boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) {

assertEquals(producerFlowControl, dest.isProducerFlowControl());
assertEquals(alwaysRetroactive, dest.isAlwaysRetroactive());
Expand All @@ -938,6 +939,7 @@ private void assertAllDestPolicyProperties(BaseDestination dest, boolean produce
assertEquals(cursorMemoryHighWaterMark, dest.getCursorMemoryHighWaterMark());
assertEquals(storeUsageHighWaterMark, dest.getStoreUsageHighWaterMark());
assertEquals(gcInactiveDestinations, dest.isGcIfInactive());
assertEquals(gcWithOnlyWildcardConsumers, dest.isGcWithOnlyWildcardConsumers());
assertEquals(gcWithNetworkConsumers, dest.isGcWithNetworkConsumers());
assertEquals(inactiveTimeoutBeforeGC, dest.getInactiveTimeoutBeforeGC());
assertEquals(reduceMemoryFootprint, dest.isReduceMemoryFootprint());
Expand Down
Loading