Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suggested changes to AMQ-8463 #1

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import jakarta.jms.InvalidSelectorException;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
Expand All @@ -53,6 +55,7 @@
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.management.MessageFlowStats;
import org.apache.activemq.management.UnsampledStatistic;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.util.URISupport;
Expand Down Expand Up @@ -633,55 +636,51 @@ public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatistic

@Override
public long getEnqueuedMessageBrokerInTime() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getEnqueuedMessageBrokerInTime().getValue() : 0l);
return getMessageFlowStat(MessageFlowStats::getEnqueuedMessageBrokerInTime, 0L);
}

@Override
public String getEnqueuedMessageClientId() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getEnqueuedMessageClientID().getValue() : null);
return getMessageFlowStat(MessageFlowStats::getEnqueuedMessageClientID, null);
}

@Override
public String getEnqueuedMessageId() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getEnqueuedMessageID().getValue() : null);
return getMessageFlowStat(MessageFlowStats::getEnqueuedMessageID, null);
}

@Override
public long getEnqueuedMessageTimestamp() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getEnqueuedMessageTimestamp().getValue() : 0l);
return getMessageFlowStat(MessageFlowStats::getEnqueuedMessageTimestamp, 0L);
}

@Override
public long getDequeuedMessageBrokerInTime() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageBrokerInTime().getValue() : 0l);
return getMessageFlowStat(MessageFlowStats::getDequeuedMessageBrokerInTime, 0L);
}

@Override
public long getDequeuedMessageBrokerOutTime() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageBrokerOutTime().getValue() : 0l);
return getMessageFlowStat(MessageFlowStats::getDequeuedMessageBrokerOutTime, 0L);
}

@Override
public String getDequeuedMessageClientId() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageClientID().getValue() : null);
return getMessageFlowStat(MessageFlowStats::getDequeuedMessageClientID, null);
}

@Override
public String getDequeuedMessageId() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageID().getValue() : null);
return getMessageFlowStat(MessageFlowStats::getDequeuedMessageID, null);
}

@Override
public long getDequeuedMessageTimestamp() {
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageTimestamp().getValue() : 0l);
return getMessageFlowStat(MessageFlowStats::getDequeuedMessageTimestamp, 0L);
}

private <T> T getMessageFlowStat(Function<MessageFlowStats, UnsampledStatistic<T>> f, T defVal) {
final var stats = destination.getDestinationStatistics().getMessageFlowStats();
return stats != null ? f.apply(stats).getValue() : defVal;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public abstract class BaseDestination implements Destination {
private boolean disposed = false;
private boolean doOptimzeMessageStorage = true;
private boolean advancedNetworkStatisticsEnabled = false;
private boolean advancedMessageStatisticsEnabled = false;

/*
* percentage of in-flight messages above which optimize message store is disabled
Expand Down Expand Up @@ -828,7 +827,7 @@ public boolean isGcWithNetworkConsumers() {
@Override
public void markForGC(long timeStamp) {
if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
&& destinationStatistics.messages.getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) {
&& destinationStatistics.getMessages().getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) {
this.lastActiveTime = timeStamp;
}
}
Expand All @@ -837,7 +836,7 @@ public void markForGC(long timeStamp) {
public boolean canGC() {
boolean result = false;
final long currentLastActiveTime = this.lastActiveTime;
if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.messages.getCount() == 0L ) {
if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.getMessages().getCount() == 0L ) {
if ((System.currentTimeMillis() - currentLastActiveTime) >= getInactiveTimeoutBeforeGC()) {
result = true;
}
Expand Down Expand Up @@ -883,12 +882,11 @@ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatistic

@Override
public boolean isAdvancedMessageStatisticsEnabled() {
return this.advancedMessageStatisticsEnabled;
return this.destinationStatistics.isAdvancedMessageStatisticsEnabled();
}

@Override
public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
this.advancedMessageStatisticsEnabled = advancedMessageStatisticsEnabled;
this.destinationStatistics.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.activemq.broker.region;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.PollCountStatisticImpl;
import org.apache.activemq.management.StatsImpl;
Expand All @@ -27,30 +29,29 @@
*/
public class DestinationStatistics extends StatsImpl {

protected final CountStatisticImpl enqueues;
protected final CountStatisticImpl dequeues;
protected final CountStatisticImpl forwards;
protected final CountStatisticImpl consumers;
protected final CountStatisticImpl producers;
protected final CountStatisticImpl messages;
protected PollCountStatisticImpl messagesCached;
protected final CountStatisticImpl dispatched;
protected final CountStatisticImpl duplicateFromStore;
protected final CountStatisticImpl inflight;
protected final CountStatisticImpl expired;
protected final TimeStatisticImpl processTime;
protected final CountStatisticImpl blockedSends;
protected final TimeStatisticImpl blockedTime;
protected final SizeStatisticImpl messageSize;
protected final CountStatisticImpl maxUncommittedExceededCount;
private final CountStatisticImpl enqueues;
private final CountStatisticImpl dequeues;
private final CountStatisticImpl forwards;
private final CountStatisticImpl consumers;
private final CountStatisticImpl producers;
private final CountStatisticImpl messages;
private final PollCountStatisticImpl messagesCached;
private final CountStatisticImpl dispatched;
private final CountStatisticImpl duplicateFromStore;
private final CountStatisticImpl inflight;
private final CountStatisticImpl expired;
private final TimeStatisticImpl processTime;
private final CountStatisticImpl blockedSends;
private final TimeStatisticImpl blockedTime;
private final SizeStatisticImpl messageSize;
private final CountStatisticImpl maxUncommittedExceededCount;

// [AMQ-9437] Advanced Network Statistics
protected final CountStatisticImpl networkEnqueues;
protected final CountStatisticImpl networkDequeues;
private final CountStatisticImpl networkEnqueues;
private final CountStatisticImpl networkDequeues;

// [AMQ-8463] Advanced Message Statistics are disabled by default
protected final AtomicBoolean advancedMessageStatisticsEnabled = new AtomicBoolean(false);
protected volatile MessageFlowStatsImpl messageFlowStats;
private final AtomicReference<MessageFlowStatsImpl> messageFlowStats = new AtomicReference<>();

public DestinationStatistics() {

Expand All @@ -77,10 +78,6 @@ public DestinationStatistics() {

networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection");
networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection");

if(advancedMessageStatisticsEnabled.get()) {
messageFlowStats = new MessageFlowStatsImpl();
}
}

public CountStatisticImpl getEnqueues() {
Expand Down Expand Up @@ -119,10 +116,6 @@ public CountStatisticImpl getMessages() {
return messages;
}

public void setMessagesCached(PollCountStatisticImpl messagesCached) {
this.messagesCached = messagesCached;
}

public CountStatisticImpl getDispatched() {
return dispatched;
}
Expand Down Expand Up @@ -158,7 +151,7 @@ public CountStatisticImpl getNetworkDequeues() {
}

public MessageFlowStats getMessageFlowStats() {
return messageFlowStats;
return messageFlowStats.get();
}

public void reset() {
Expand All @@ -177,11 +170,8 @@ public void reset() {
maxUncommittedExceededCount.reset();
networkEnqueues.reset();
networkDequeues.reset();

MessageFlowStatsImpl tmpMessageFlowStats = messageFlowStats;
if(advancedMessageStatisticsEnabled.get() && tmpMessageFlowStats != null) {
tmpMessageFlowStats.reset();
}
Optional.ofNullable(messageFlowStats.get())
.ifPresent(MessageFlowStatsImpl::reset);
}
}

Expand Down Expand Up @@ -209,10 +199,8 @@ public void setEnabled(boolean enabled) {
networkDequeues.setEnabled(enabled);

// [AMQ-9437] Advanced Message Statistics
MessageFlowStatsImpl tmpMessageFlowStats = messageFlowStats;
if(tmpMessageFlowStats != null) {
tmpMessageFlowStats.setEnabled(enabled);
}
Optional.ofNullable(messageFlowStats.get())
.ifPresent(stats -> stats.setEnabled(enabled));
}

public void setParent(DestinationStatistics parent) {
Expand Down Expand Up @@ -259,18 +247,20 @@ public void setParent(DestinationStatistics parent) {
}
}

public synchronized void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
if(advancedMessageStatisticsEnabled) {
this.messageFlowStats = new MessageFlowStatsImpl();
this.messageFlowStats.setEnabled(enabled);
this.advancedMessageStatisticsEnabled.set(advancedMessageStatisticsEnabled);
// This is the only method that can mutate the messageFlowStats state
// so we only need to synchronize this method to make sure we don't have 2 threads
// trying to set at the same time.
public synchronized void setAdvancedMessageStatisticsEnabled(boolean enabled) {
// we can just use the get() here on the reference as no other spot can
// set the reference so there is no race condition
if(enabled && this.messageFlowStats.get() == null) {
this.messageFlowStats.set(new MessageFlowStatsImpl());
} else {
this.advancedMessageStatisticsEnabled.set(advancedMessageStatisticsEnabled);
this.messageFlowStats = null;
this.messageFlowStats.set(null);
}
}

public synchronized boolean isAdvancedMessageStatisticsEnabled() {
return this.advancedMessageStatisticsEnabled.get();
public boolean isAdvancedMessageStatisticsEnabled() {
return this.messageFlowStats.get() != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,8 @@ public void afterRollback() throws Exception {
((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount());
}

MessageFlowStats tmpMessageFlowStats = ((Destination)node.getRegionDestination()).getDestinationStatistics().getMessageFlowStats();
if(((Destination)node.getRegionDestination()).isAdvancedMessageStatisticsEnabled() && tmpMessageFlowStats != null) {
final MessageFlowStats tmpMessageFlowStats = ((Destination)node.getRegionDestination()).getDestinationStatistics().getMessageFlowStats();
if(tmpMessageFlowStats != null) {
tmpMessageFlowStats.dequeueStats(context.getClientId(), node.getMessageId().toString(), node.getMessage().getTimestamp(), node.getMessage().getBrokerInTime(), node.getMessage().getBrokerOutTime());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1917,8 +1917,8 @@ private void dropMessage(ConnectionContext context, QueueMessageReference refere
destinationStatistics.getDequeues().increment();
destinationStatistics.getMessages().decrement();

MessageFlowStats tmpMessageFlowStats = destinationStatistics.getMessageFlowStats();
if(isAdvancedMessageStatisticsEnabled() && tmpMessageFlowStats != null) {
final var tmpMessageFlowStats = destinationStatistics.getMessageFlowStats();
if(tmpMessageFlowStats != null) {
Message tmpMessage = reference.getMessage();
tmpMessageFlowStats.dequeueStats(context.getClientId(), tmpMessage.getMessageId().toString(), tmpMessage.getTimestamp(), tmpMessage.getBrokerInTime(), tmpMessage.getBrokerOutTime());
}
Expand Down Expand Up @@ -1983,9 +1983,8 @@ final void messageSent(final ConnectionContext context, final Message msg) throw
destinationStatistics.getMessages().increment();
destinationStatistics.getMessageSize().addSize(msg.getSize());

MessageFlowStats tmpMessageFlowStats = destinationStatistics.getMessageFlowStats();

if(isAdvancedMessageStatisticsEnabled() && tmpMessageFlowStats != null) {
final var tmpMessageFlowStats = destinationStatistics.getMessageFlowStats();
if(tmpMessageFlowStats != null) {
tmpMessageFlowStats.enqueueStats(context.getClientId(), msg.getMessageId().toString(), msg.getTimestamp(), msg.getBrokerInTime());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,8 @@ protected void dispatch(final ConnectionContext context, Message message) throws
// destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();

MessageFlowStats tmpMessageFlowStats = destinationStatistics.getMessageFlowStats();
if(isAdvancedMessageStatisticsEnabled() && tmpMessageFlowStats != null) {
final var tmpMessageFlowStats = destinationStatistics.getMessageFlowStats();
if(tmpMessageFlowStats != null) {
tmpMessageFlowStats.enqueueStats(context.getClientId(), message.getMessageId().toString(), message.getTimestamp(), message.getBrokerInTime());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,8 @@ private void incrementStatsOnAck(final Destination destination, final MessageAck
}
}

MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
if(destination.isAdvancedMessageStatisticsEnabled() && tmpMessageFlowStats != null) {
final var tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
if(tmpMessageFlowStats != null) {
tmpMessageFlowStats.dequeueStats(context.getClientId(), ack.getLastMessageId().toString());
}
if (ack.isExpiredAck()) {
Expand Down