Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-5119 Expired Messages on Cluster SNF should to to the original Expiry Queue #5327

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,10 @@ default Queue locateQueue(String queueName) {
return locateQueue(SimpleString.of(queueName));
}

default Queue locateQueue(String address, String queue) throws Exception {
return null;
}

default BindingQueryResult bindingQuery(SimpleString address) throws Exception {
return bindingQuery(address, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
Expand Down Expand Up @@ -2385,6 +2386,30 @@ public Queue locateQueue(SimpleString queueName) {
return (Queue) binding.getBindable();
}

@Override
public Queue locateQueue(String address, String queue) throws Exception {
Bindings bindings = postOffice.getBindingsForAddress(SimpleString.of(address));
if (bindings == null) {
return null;
}

Binding binding = bindings.getBinding(queue);
if (binding == null) {
return null;
}

Bindable bindingContent = binding.getBindable();

if (!(bindingContent instanceof Queue)) {
if (logger.isDebugEnabled()) {
logger.debug("localQueue({}. {}) found non Queue ( {} ) on binding table, returning null instead", address, queue, bindingContent);
}
return null;
}

return (Queue) bindingContent;
}

@Deprecated
@Override
public Queue deployQueue(final SimpleString address,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ private void checkIDSupplier(NodeStoreFactory<MessageReference> nodeStoreFactory

private final StorageManager storageManager;

private volatile AddressSettings addressSettings;
// Instead of looking up the AddressSettings every time, we cache and monitor it through onChange
private volatile AddressSettings cachedAddressSettings;

private final ActiveMQServer server;

Expand Down Expand Up @@ -733,9 +734,9 @@ public QueueImpl(final QueueConfiguration queueConfiguration,
if (addressSettingsRepository != null) {
addressSettingsRepositoryListener = new AddressSettingsRepositoryListener(addressSettingsRepository);
addressSettingsRepository.registerListener(addressSettingsRepositoryListener);
this.addressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch());
this.cachedAddressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch());
} else {
this.addressSettings = new AddressSettings();
this.cachedAddressSettings = new AddressSettings();
}

if (pageSubscription != null) {
Expand All @@ -757,9 +758,9 @@ public QueueImpl(final QueueConfiguration queueConfiguration,

this.ringSize = queueConfiguration.getRingSize() == null ? ActiveMQDefaultConfiguration.getDefaultRingSize() : queueConfiguration.getRingSize();

this.initialQueueBufferSize = this.addressSettings.getInitialQueueBufferSize() == null
this.initialQueueBufferSize = this.cachedAddressSettings.getInitialQueueBufferSize() == null
? ActiveMQDefaultConfiguration.INITIAL_QUEUE_BUFFER_SIZE
: this.addressSettings.getInitialQueueBufferSize();
: this.cachedAddressSettings.getInitialQueueBufferSize();
this.intermediateMessageReferences = new MpscUnboundedArrayQueue<>(initialQueueBufferSize);
}

Expand Down Expand Up @@ -2129,36 +2130,92 @@ public void expire(final MessageReference ref) throws Exception {
* hence no information about delivering statistics should be updated. */
@Override
public void expire(final MessageReference ref, final ServerConsumer consumer, boolean delivering) throws Exception {
if (addressSettings.getExpiryAddress() != null) {
createExpiryResources();
expire(null, ref, consumer, delivering);
}

if (logger.isTraceEnabled()) {
logger.trace("moving expired reference {} to address = {} from queue={}", ref, addressSettings.getExpiryAddress(), name);
private void expire(final Transaction tx, final MessageReference ref, final ServerConsumer consumer, boolean delivering) throws Exception {
AddressSettings settingsToUse = getMessageAddressSettings(ref.getMessage());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expireFromScan method goes on to do "SimpleString expiryAddress = settingsToUse.getExpiryAddress();" and then use that variable in [most of] the places the value is used. Its strange for two very similar related methods right next to each other to be differing in that. Would be more readable later to either both use the variable, or neither use the variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will merge these two methods.. I was afraid of touching something unrelated.. but it's the best choice given how it's progressed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I wasn't suggesting to combine them as they did seem sufficiently different on the backend to justify the two methods. Its just the initial bits were very similar so it would have been nice for them to be set out the same way for readability.

Combining them seems to have rearranged at least the order of some stuff for one method, or added some stuff around bindings that other method didnt do before...I'd need to take a much fuller look to see what I think of those changes, but I need to finish up just now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only difference I could see is the use of transactions or not. the use of consumers or not...

those could be set as null and they would have the same semantics... I think it should be good.

Testsuite is 100% passing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gemmellr since you mentioned you are looking to review this.. I will leave this PR open until next week.. no rush about merging this yet.. although it would be nice before next release.. but it's not happening next week anyways... so I will leave it open for now.

SimpleString expiryAddress = settingsToUse.getExpiryAddress();

if (logger.isDebugEnabled()) {
logger.debug("expire on {}/{}, consumer={}, expiryAddress={}", this.address, this.name, consumer, expiryAddress);
}

if (expiryAddress != null && expiryAddress.length() != 0) {
String messageAddress = ref.getMessage().getAddress();

if (messageAddress == null) {
// in the unlikely event where a message does not have an address stored on the message itself,
// we will get the address from the current queue
messageAddress = String.valueOf(getAddress());
}
createExpiryResources(messageAddress, settingsToUse);

move(null, addressSettings.getExpiryAddress(), null, ref, false, AckReason.EXPIRED, consumer, null, delivering);
Bindings bindingList = postOffice.lookupBindingsForAddress(expiryAddress);

if (bindingList == null || bindingList.getBindings().isEmpty()) {
if (!printErrorExpiring) {
// print this only once
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
printErrorExpiring = true;
}
acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
} else {
move(tx, expiryAddress, null, ref, false, AckReason.EXPIRED, consumer, null, delivering);
}
} else {
logger.trace("expiry is null, just acking expired message for reference {} from queue={}", ref, name);
if (!printErrorExpiring) {
printErrorExpiring = true;
// print this only once
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoAddress(name);
}

acknowledge(null, ref, AckReason.EXPIRED, consumer, delivering);
acknowledge(tx, ref, AckReason.EXPIRED, consumer, delivering);
}

// potentially auto-delete this queue if this expired the last message
refCountForConsumers.check();
Comment on lines -2146 to -2147
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing 'expire' method with consumer always did this, before checking the broker plugins. It still did this when you had added the 'expireWithScan' method. Now that you combined the methods, this is only done if there is a tx, and only after checking the broker plugins (and only if there is a message plugin, even though this is a queue-level check). These both seem strange differences from just consolidating some methods. Was one of them wrong before?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You updated the behaviour a bit, bit it still differs from before in the sense that for the combined expire method it only runs the refCountForConsumers.check(); if there is a broker message plugin, rather than always doing it before for this old expire method (the other old expire method only did it if there is a plugin). Was it wrong to always check it in this method before, i.e was the other method correct not to do it without a plugin? Essentially, why should it need a broker message plugin to do it?


if (server != null && server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, addressSettings.getExpiryAddress(), consumer));
if (tx == null) {
// potentially auto-delete this queue if this expired the last message
refCountForConsumers.check();

server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, settingsToUse.getExpiryAddress(), consumer));
} else {
ExpiryLogger expiryLogger = (ExpiryLogger) tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER);
if (expiryLogger == null) {
expiryLogger = new ExpiryLogger();
tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, expiryLogger);
tx.addOperation(expiryLogger);
}

expiryLogger.addExpiry(address, ref);

// potentially auto-delete this queue if this expired the last message
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
refCountForConsumers.check();
}
});
}
}
}

private AddressSettings getMessageAddressSettings(Message message) {
if (message.getAddress() == null || message.getAddress().equals(String.valueOf(address))) {
return cachedAddressSettings;
} else {
return server.getAddressSettingsRepository().getMatch(message.getAddress());
}
}

@Override
public SimpleString getExpiryAddress() {
return this.addressSettings.getExpiryAddress();
return this.cachedAddressSettings.getExpiryAddress();
}

@Override
public SimpleString getDeadLetterAddress() {
return this.addressSettings.getDeadLetterAddress();
return this.cachedAddressSettings.getDeadLetterAddress();
}

@Override
Expand Down Expand Up @@ -2510,7 +2567,7 @@ public synchronized int expireReferences(final Filter filter) throws Exception {
MessageReference ref = iter.next();
if (filter == null || filter.match(ref.getMessage())) {
incDelivering(ref);
expire(tx, ref, true);
expire(tx, ref, null, true);
iter.remove();
refRemoved(ref);
count++;
Expand Down Expand Up @@ -2543,7 +2600,7 @@ public void expireReferences(Runnable done) {
}

private boolean isExpiryDisabled() {
final SimpleString expiryAddress = addressSettings.getExpiryAddress();
final SimpleString expiryAddress = cachedAddressSettings.getExpiryAddress();
if (expiryAddress != null && expiryAddress.equals(this.address)) {
// check expire with itself would be silly (waste of time)
logger.trace("Redundant expiration from {} to {}", address, expiryAddress);
Expand Down Expand Up @@ -2637,7 +2694,7 @@ public void run() {
final Transaction tx = new TransactionImpl(storageManager);
for (MessageReference ref : expiredMessages) {
try {
expire(tx, ref, true);
expire(tx, ref, null, true);
refRemoved(ref);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(ref, e);
Expand Down Expand Up @@ -3598,23 +3655,23 @@ public Pair<Boolean, Boolean> checkRedelivery(final MessageReference reference,
storageManager.updateDeliveryCount(reference);
}

int maxDeliveries = addressSettings.getMaxDeliveryAttempts();
int maxDeliveries = cachedAddressSettings.getMaxDeliveryAttempts();
int deliveryCount = reference.getDeliveryCount();

// First check DLA
if (maxDeliveries > 0 && deliveryCount >= maxDeliveries) {
if (logger.isTraceEnabled()) {
logger.trace("Sending reference {} to DLA = {} since ref.getDeliveryCount={} and maxDeliveries={} from queue={}",
reference, addressSettings.getDeadLetterAddress(), reference.getDeliveryCount(), maxDeliveries, name);
reference, cachedAddressSettings.getDeadLetterAddress(), reference.getDeliveryCount(), maxDeliveries, name);
}
boolean dlaResult = sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress());
boolean dlaResult = sendToDeadLetterAddress(null, reference, cachedAddressSettings.getDeadLetterAddress());

return new Pair<>(false, dlaResult);
} else {
// Second check Redelivery Delay
long redeliveryDelay = addressSettings.getRedeliveryDelay();
long redeliveryDelay = cachedAddressSettings.getRedeliveryDelay();
if (!ignoreRedeliveryDelay && redeliveryDelay > 0) {
redeliveryDelay = calculateRedeliveryDelay(addressSettings, deliveryCount);
redeliveryDelay = calculateRedeliveryDelay(cachedAddressSettings, deliveryCount);

if (logger.isTraceEnabled()) {
logger.trace("Setting redeliveryDelay={} on reference={}", redeliveryDelay, reference);
Expand Down Expand Up @@ -3896,51 +3953,6 @@ private Message makeCopy(final MessageReference ref,
return LargeServerMessageImpl.checkLargeMessage(copy, storageManager);
}

private void expire(final Transaction tx, final MessageReference ref, boolean delivering) throws Exception {
SimpleString expiryAddress = addressSettings.getExpiryAddress();

if (expiryAddress != null && expiryAddress.length() != 0) {

createExpiryResources();

Bindings bindingList = postOffice.lookupBindingsForAddress(expiryAddress);

if (bindingList == null || bindingList.getBindings().isEmpty()) {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
} else {
move(tx, expiryAddress, null, ref, false, AckReason.EXPIRED, null, null, delivering);
}
} else {
if (!printErrorExpiring) {
printErrorExpiring = true;
// print this only once
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoAddress(name);
}

acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
}

if (server != null && server.hasBrokerMessagePlugins()) {
ExpiryLogger expiryLogger = (ExpiryLogger)tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER);
if (expiryLogger == null) {
expiryLogger = new ExpiryLogger();
tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, expiryLogger);
tx.addOperation(expiryLogger);
}

expiryLogger.addExpiry(address, ref);
}

// potentially auto-delete this queue if this expired the last message
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
refCountForConsumers.check();
}
});
}

private class ExpiryLogger extends TransactionOperationAbstract {

List<Pair<SimpleString, MessageReference>> expiries = new LinkedList<>();
Expand All @@ -3964,7 +3976,7 @@ public void afterCommit(Transaction tx) {

@Override
public boolean sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception {
return sendToDeadLetterAddress(tx, ref, addressSettings.getDeadLetterAddress());
return sendToDeadLetterAddress(tx, ref, cachedAddressSettings.getDeadLetterAddress());
}

private boolean sendToDeadLetterAddress(final Transaction tx,
Expand Down Expand Up @@ -3999,22 +4011,23 @@ private boolean sendToDeadLetterAddress(final Transaction tx,

private void createDeadLetterResources() throws Exception {
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString());
createResources(addressSettings.isAutoCreateDeadLetterResources(), addressSettings.getDeadLetterAddress(), addressSettings.getDeadLetterQueuePrefix(), addressSettings.getDeadLetterQueueSuffix());
createResources(String.valueOf(getAddress()), addressSettings.isAutoCreateDeadLetterResources(), addressSettings.getDeadLetterAddress(), addressSettings.getDeadLetterQueuePrefix(), addressSettings.getDeadLetterQueueSuffix());
}

private void createExpiryResources() throws Exception {
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getAddress().toString());
createResources(addressSettings.isAutoCreateExpiryResources(), addressSettings.getExpiryAddress(), addressSettings.getExpiryQueuePrefix(), addressSettings.getExpiryQueueSuffix());
private void createExpiryResources(String address, AddressSettings messageAddressSettings) throws Exception {
createResources(address, messageAddressSettings.isAutoCreateExpiryResources(), messageAddressSettings.getExpiryAddress(), messageAddressSettings.getExpiryQueuePrefix(), messageAddressSettings.getExpiryQueueSuffix());
}

private void createResources(boolean isAutoCreate, SimpleString destinationAddress, SimpleString prefix, SimpleString suffix) throws Exception {
if (isAutoCreate && !getAddress().equals(destinationAddress)) {
private void createResources(String address, boolean isAutoCreate, SimpleString destinationAddress, SimpleString prefix, SimpleString suffix) throws Exception {
if (isAutoCreate && !address.equals(destinationAddress)) {
if (destinationAddress != null && destinationAddress.length() != 0) {
SimpleString destinationQueueName = prefix.concat(getAddress()).concat(suffix);
SimpleString filter = SimpleString.of(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, getAddress()));
SimpleString destinationQueueName = prefix.concat(address).concat(suffix);
SimpleString filter = SimpleString.of(String.format("%s = '%s'", Message.HDR_ORIGINAL_ADDRESS, address));
try {
logger.debug("Creating Resource queue {}", destinationQueueName);
server.createQueue(QueueConfiguration.of(destinationQueueName).setAddress(destinationAddress).setFilterString(filter).setAutoCreated(true).setAutoCreateAddress(true), true);
} catch (ActiveMQQueueExistsException e) {
logger.debug("resource {} already existed, ignoring outcome", destinationQueueName);
// ignore
}
}
Expand Down Expand Up @@ -4770,7 +4783,7 @@ private long getPersistentSize(final MessageReference reference) {
}

private void configureSlowConsumerReaper() {
if (addressSettings == null || addressSettings.getSlowConsumerThreshold() == AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD) {
if (cachedAddressSettings == null || cachedAddressSettings.getSlowConsumerThreshold() == AddressSettings.DEFAULT_SLOW_CONSUMER_THRESHOLD) {
if (slowConsumerReaperFuture != null) {
slowConsumerReaperFuture.cancel(false);
slowConsumerReaperFuture = null;
Expand All @@ -4780,13 +4793,13 @@ private void configureSlowConsumerReaper() {
}
} else {
if (slowConsumerReaperRunnable == null) {
scheduleSlowConsumerReaper(addressSettings);
} else if (slowConsumerReaperRunnable.checkPeriod != addressSettings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.thresholdInMsgPerSecond != addressSettings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(addressSettings.getSlowConsumerPolicy())) {
scheduleSlowConsumerReaper(cachedAddressSettings);
} else if (slowConsumerReaperRunnable.checkPeriod != cachedAddressSettings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.thresholdInMsgPerSecond != cachedAddressSettings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(cachedAddressSettings.getSlowConsumerPolicy())) {
if (slowConsumerReaperFuture != null) {
slowConsumerReaperFuture.cancel(false);
slowConsumerReaperFuture = null;
}
scheduleSlowConsumerReaper(addressSettings);
scheduleSlowConsumerReaper(cachedAddressSettings);
}
}
}
Expand Down Expand Up @@ -4847,7 +4860,7 @@ private class AddressSettingsRepositoryListener implements HierarchicalRepositor

@Override
public void onChange() {
addressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch());
cachedAddressSettings = addressSettingsRepository.getMatch(getAddressSettingsMatch());
checkDeadLetterAddressAndExpiryAddress();
configureSlowConsumerReaper();
}
Expand All @@ -4863,10 +4876,10 @@ private String getAddressSettingsMatch() {

private void checkDeadLetterAddressAndExpiryAddress() {
if (!Env.isTestEnv() && !internalQueue && !address.equals(server.getConfiguration().getManagementNotificationAddress())) {
if (addressSettings.getDeadLetterAddress() == null) {
if (cachedAddressSettings.getDeadLetterAddress() == null) {
ActiveMQServerLogger.LOGGER.AddressSettingsNoDLA(name);
}
if (addressSettings.getExpiryAddress() == null) {
if (cachedAddressSettings.getExpiryAddress() == null) {
ActiveMQServerLogger.LOGGER.AddressSettingsNoExpiryAddress(name);
}
}
Expand Down
Loading