Skip to content

Commit

Permalink
Merge pull request #1 from cshannon/AMQ-8463-updates
Browse files Browse the repository at this point in the history
Suggested changes to AMQ-8463
  • Loading branch information
mattrpav authored Jan 17, 2025
2 parents 03b3b1f + 2084df8 commit 346d308
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import jakarta.jms.InvalidSelectorException;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
Expand All @@ -53,6 +55,7 @@
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.management.MessageFlowStats;
import org.apache.activemq.management.UnsampledStatistic;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.util.URISupport;
Expand Down Expand Up @@ -633,55 +636,51 @@ public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatistic

@Override
public long getEnqueuedMessageBrokerInTime() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getEnqueuedMessageBrokerInTime().getValue() : 0l);
return getMessageFlowStat(MessageFlowStats::getEnqueuedMessageBrokerInTime, 0L);
}

@Override
public String getEnqueuedMessageClientId() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getEnqueuedMessageClientID().getValue() : null);
return getMessageFlowStat(MessageFlowStats::getEnqueuedMessageClientID, null);
}

@Override
public String getEnqueuedMessageId() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getEnqueuedMessageID().getValue() : null);
return getMessageFlowStat(MessageFlowStats::getEnqueuedMessageID, null);
}

@Override
public long getEnqueuedMessageTimestamp() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getEnqueuedMessageTimestamp().getValue() : 0l);
return getMessageFlowStat(MessageFlowStats::getEnqueuedMessageTimestamp, 0L);
}

@Override
public long getDequeuedMessageBrokerInTime() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageBrokerInTime().getValue() : 0l);
return getMessageFlowStat(MessageFlowStats::getDequeuedMessageBrokerInTime, 0L);
}

@Override
public long getDequeuedMessageBrokerOutTime() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageBrokerOutTime().getValue() : 0l);
return getMessageFlowStat(MessageFlowStats::getDequeuedMessageBrokerOutTime, 0L);
}

@Override
public String getDequeuedMessageClientId() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageClientID().getValue() : null);
return getMessageFlowStat(MessageFlowStats::getDequeuedMessageClientID, null);
}

@Override
public String getDequeuedMessageId() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageID().getValue() : null);
return getMessageFlowStat(MessageFlowStats::getDequeuedMessageID, null);
}

@Override
public long getDequeuedMessageTimestamp() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageTimestamp().getValue() : 0l);
return getMessageFlowStat(MessageFlowStats::getDequeuedMessageTimestamp, 0L);
}

private <T> T getMessageFlowStat(Function<MessageFlowStats, UnsampledStatistic<T>> f, T defVal) {
final var stats = destination.getDestinationStatistics().getMessageFlowStats();
return stats != null ? f.apply(stats).getValue() : defVal;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public abstract class BaseDestination implements Destination {
private boolean disposed = false;
private boolean doOptimzeMessageStorage = true;
private boolean advancedNetworkStatisticsEnabled = false;
private boolean advancedMessageStatisticsEnabled = false;

/*
* percentage of in-flight messages above which optimize message store is disabled
Expand Down Expand Up @@ -828,7 +827,7 @@ public boolean isGcWithNetworkConsumers() {
@Override
public void markForGC(long timeStamp) {
if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
&& destinationStatistics.messages.getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) {
&& destinationStatistics.getMessages().getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) {
this.lastActiveTime = timeStamp;
}
}
Expand All @@ -837,7 +836,7 @@ public void markForGC(long timeStamp) {
public boolean canGC() {
boolean result = false;
final long currentLastActiveTime = this.lastActiveTime;
if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.messages.getCount() == 0L ) {
if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.getMessages().getCount() == 0L ) {
if ((System.currentTimeMillis() - currentLastActiveTime) >= getInactiveTimeoutBeforeGC()) {
result = true;
}
Expand Down Expand Up @@ -883,12 +882,11 @@ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatistic

@Override
public boolean isAdvancedMessageStatisticsEnabled() {
return this.advancedMessageStatisticsEnabled;
return this.destinationStatistics.isAdvancedMessageStatisticsEnabled();
}

@Override
public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
this.advancedMessageStatisticsEnabled = advancedMessageStatisticsEnabled;
this.destinationStatistics.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.activemq.broker.region;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.PollCountStatisticImpl;
import org.apache.activemq.management.StatsImpl;
Expand All @@ -27,30 +29,29 @@
*/
public class DestinationStatistics extends StatsImpl {

protected final CountStatisticImpl enqueues;
protected final CountStatisticImpl dequeues;
protected final CountStatisticImpl forwards;
protected final CountStatisticImpl consumers;
protected final CountStatisticImpl producers;
protected final CountStatisticImpl messages;
protected PollCountStatisticImpl messagesCached;
protected final CountStatisticImpl dispatched;
protected final CountStatisticImpl duplicateFromStore;
protected final CountStatisticImpl inflight;
protected final CountStatisticImpl expired;
protected final TimeStatisticImpl processTime;
protected final CountStatisticImpl blockedSends;
protected final TimeStatisticImpl blockedTime;
protected final SizeStatisticImpl messageSize;
protected final CountStatisticImpl maxUncommittedExceededCount;
private final CountStatisticImpl enqueues;
private final CountStatisticImpl dequeues;
private final CountStatisticImpl forwards;
private final CountStatisticImpl consumers;
private final CountStatisticImpl producers;
private final CountStatisticImpl messages;
private final PollCountStatisticImpl messagesCached;
private final CountStatisticImpl dispatched;
private final CountStatisticImpl duplicateFromStore;
private final CountStatisticImpl inflight;
private final CountStatisticImpl expired;
private final TimeStatisticImpl processTime;
private final CountStatisticImpl blockedSends;
private final TimeStatisticImpl blockedTime;
private final SizeStatisticImpl messageSize;
private final CountStatisticImpl maxUncommittedExceededCount;

// [AMQ-9437] Advanced Network Statistics
protected final CountStatisticImpl networkEnqueues;
protected final CountStatisticImpl networkDequeues;
private final CountStatisticImpl networkEnqueues;
private final CountStatisticImpl networkDequeues;

// [AMQ-8463] Advanced Message Statistics are disabled by default
protected final AtomicBoolean advancedMessageStatisticsEnabled = new AtomicBoolean(false);
protected volatile MessageFlowStatsImpl messageFlowStats;
private final AtomicReference<MessageFlowStatsImpl> messageFlowStats = new AtomicReference<>();

public DestinationStatistics() {

Expand All @@ -77,10 +78,6 @@ public DestinationStatistics() {

networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection");
networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection");

if(advancedMessageStatisticsEnabled.get()) {
messageFlowStats = new MessageFlowStatsImpl();
}
}

public CountStatisticImpl getEnqueues() {
Expand Down Expand Up @@ -119,10 +116,6 @@ public CountStatisticImpl getMessages() {
return messages;
}

public void setMessagesCached(PollCountStatisticImpl messagesCached) {
this.messagesCached = messagesCached;
}

public CountStatisticImpl getDispatched() {
return dispatched;
}
Expand Down Expand Up @@ -158,7 +151,7 @@ public CountStatisticImpl getNetworkDequeues() {
}

public MessageFlowStats getMessageFlowStats() {
return messageFlowStats;
return messageFlowStats.get();
}

public void reset() {
Expand All @@ -177,11 +170,8 @@ public void reset() {
maxUncommittedExceededCount.reset();
networkEnqueues.reset();
networkDequeues.reset();

MessageFlowStatsImpl tmpMessageFlowStats = messageFlowStats;
if(advancedMessageStatisticsEnabled.get() && tmpMessageFlowStats != null) {
tmpMessageFlowStats.reset();
}
Optional.ofNullable(messageFlowStats.get())
.ifPresent(MessageFlowStatsImpl::reset);
}
}

Expand Down Expand Up @@ -209,10 +199,8 @@ public void setEnabled(boolean enabled) {
networkDequeues.setEnabled(enabled);

// [AMQ-9437] Advanced Message Statistics
MessageFlowStatsImpl tmpMessageFlowStats = messageFlowStats;
if(tmpMessageFlowStats != null) {
tmpMessageFlowStats.setEnabled(enabled);
}
Optional.ofNullable(messageFlowStats.get())
.ifPresent(stats -> stats.setEnabled(enabled));
}

public void setParent(DestinationStatistics parent) {
Expand Down Expand Up @@ -259,18 +247,20 @@ public void setParent(DestinationStatistics parent) {
}
}

public synchronized void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
if(advancedMessageStatisticsEnabled) {
this.messageFlowStats = new MessageFlowStatsImpl();
this.messageFlowStats.setEnabled(enabled);
this.advancedMessageStatisticsEnabled.set(advancedMessageStatisticsEnabled);
// This is the only method that can mutate the messageFlowStats state
// so we only need to synchronize this method to make sure we don't have 2 threads
// trying to set at the same time.
public synchronized void setAdvancedMessageStatisticsEnabled(boolean enabled) {
// we can just use the get() here on the reference as no other spot can
// set the reference so there is no race condition
if(enabled && this.messageFlowStats.get() == null) {
this.messageFlowStats.set(new MessageFlowStatsImpl());
} else {
this.advancedMessageStatisticsEnabled.set(advancedMessageStatisticsEnabled);
this.messageFlowStats = null;
this.messageFlowStats.set(null);
}
}

public synchronized boolean isAdvancedMessageStatisticsEnabled() {
return this.advancedMessageStatisticsEnabled.get();
public boolean isAdvancedMessageStatisticsEnabled() {
return this.messageFlowStats.get() != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,8 @@ public void afterRollback() throws Exception {
((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount());
}

MessageFlowStats tmpMessageFlowStats = ((Destination)node.getRegionDestination()).getDestinationStatistics().getMessageFlowStats();
if(((Destination)node.getRegionDestination()).isAdvancedMessageStatisticsEnabled() && tmpMessageFlowStats != null) {
final MessageFlowStats tmpMessageFlowStats = ((Destination)node.getRegionDestination()).getDestinationStatistics().getMessageFlowStats();
if(tmpMessageFlowStats != null) {
tmpMessageFlowStats.dequeueStats(context.getClientId(), node.getMessageId().toString(), node.getMessage().getTimestamp(), node.getMessage().getBrokerInTime(), node.getMessage().getBrokerOutTime());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1917,8 +1917,8 @@ private void dropMessage(ConnectionContext context, QueueMessageReference refere
destinationStatistics.getDequeues().increment();
destinationStatistics.getMessages().decrement();

MessageFlowStats tmpMessageFlowStats = destinationStatistics.getMessageFlowStats();
if(isAdvancedMessageStatisticsEnabled() && tmpMessageFlowStats != null) {
final var tmpMessageFlowStats = destinationStatistics.getMessageFlowStats();
if(tmpMessageFlowStats != null) {
Message tmpMessage = reference.getMessage();
tmpMessageFlowStats.dequeueStats(context.getClientId(), tmpMessage.getMessageId().toString(), tmpMessage.getTimestamp(), tmpMessage.getBrokerInTime(), tmpMessage.getBrokerOutTime());
}
Expand Down Expand Up @@ -1983,9 +1983,8 @@ final void messageSent(final ConnectionContext context, final Message msg) throw
destinationStatistics.getMessages().increment();
destinationStatistics.getMessageSize().addSize(msg.getSize());

MessageFlowStats tmpMessageFlowStats = destinationStatistics.getMessageFlowStats();

if(isAdvancedMessageStatisticsEnabled() && tmpMessageFlowStats != null) {
final var tmpMessageFlowStats = destinationStatistics.getMessageFlowStats();
if(tmpMessageFlowStats != null) {
tmpMessageFlowStats.enqueueStats(context.getClientId(), msg.getMessageId().toString(), msg.getTimestamp(), msg.getBrokerInTime());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,8 @@ protected void dispatch(final ConnectionContext context, Message message) throws
// destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();

MessageFlowStats tmpMessageFlowStats = destinationStatistics.getMessageFlowStats();
if(isAdvancedMessageStatisticsEnabled() && tmpMessageFlowStats != null) {
final var tmpMessageFlowStats = destinationStatistics.getMessageFlowStats();
if(tmpMessageFlowStats != null) {
tmpMessageFlowStats.enqueueStats(context.getClientId(), message.getMessageId().toString(), message.getTimestamp(), message.getBrokerInTime());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,8 @@ private void incrementStatsOnAck(final Destination destination, final MessageAck
}
}

MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
if(destination.isAdvancedMessageStatisticsEnabled() && tmpMessageFlowStats != null) {
final var tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
if(tmpMessageFlowStats != null) {
tmpMessageFlowStats.dequeueStats(context.getClientId(), ack.getLastMessageId().toString());
}
if (ack.isExpiredAck()) {
Expand Down

0 comments on commit 346d308

Please sign in to comment.