Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mattrpav committed Dec 12, 2024
1 parent 5ba17f5 commit df08462
Show file tree
Hide file tree
Showing 13 changed files with 321 additions and 236 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.management.MessageFlowStats;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.util.URISupport;
Expand Down Expand Up @@ -632,47 +633,55 @@ public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatistic

@Override
public long getEnqueuedMessageBrokerInTime() {
return destination.getDestinationStatistics().getEnqueuedMessageBrokerInTime().getValue();
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getEnqueuedMessageBrokerInTime().getValue() : 0l);
}

@Override
public String getEnqueuedMessageClientId() {
return destination.getDestinationStatistics().getEnqueuedMessageClientID().getValue();
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getEnqueuedMessageClientID().getValue() : null);
}

@Override
public String getEnqueuedMessageId() {
return destination.getDestinationStatistics().getEnqueuedMessageID().getValue();
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getEnqueuedMessageID().getValue() : null);
}

@Override
public long getEnqueuedMessageTimestamp() {
return destination.getDestinationStatistics().getEnqueuedMessageTimestamp().getValue();
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getEnqueuedMessageTimestamp().getValue() : 0l);
}

@Override
public long getDequeuedMessageBrokerInTime() {
return destination.getDestinationStatistics().getDequeuedMessageBrokerInTime().getValue();
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageBrokerInTime().getValue() : 0l);
}

@Override
public long getDequeuedMessageBrokerOutTime() {
return destination.getDestinationStatistics().getDequeuedMessageBrokerOutTime().getValue();
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageBrokerOutTime().getValue() : 0l);
}

@Override
public String getDequeuedMessageClientId() {
return destination.getDestinationStatistics().getDequeuedMessageClientID().getValue();
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageClientID().getValue() : null);
}

@Override
public String getDequeuedMessageId() {
return destination.getDestinationStatistics().getDequeuedMessageID().getValue();
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageID().getValue() : null);
}

@Override
public long getDequeuedMessageTimestamp() {
return destination.getDestinationStatistics().getDequeuedMessageTimestamp().getValue();
MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats();
return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageTimestamp().getValue() : 0l);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,6 @@ public boolean isAdvancedNetworkStatisticsEnabled() {
@Override
public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled;
this.destinationStatistics.setAdvancedStatisticsEnabled(advancedNetworkStatisticsEnabled);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,58 +14,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.broker.region;

import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.PollCountStatisticImpl;
import org.apache.activemq.management.StatsImpl;

import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.activemq.management.*;

/**
* The J2EE Statistics for the a Destination.
*
*
* The Statistics for a Destination.
*/
public class DestinationStatistics extends StatsImpl {

protected CountStatisticImpl enqueues;
protected CountStatisticImpl dequeues;
protected CountStatisticImpl forwards;
protected CountStatisticImpl consumers;
protected CountStatisticImpl producers;
protected CountStatisticImpl messages;
protected final CountStatisticImpl enqueues;
protected final CountStatisticImpl dequeues;
protected final CountStatisticImpl forwards;
protected final CountStatisticImpl consumers;
protected final CountStatisticImpl producers;
protected final CountStatisticImpl messages;
protected PollCountStatisticImpl messagesCached;
protected CountStatisticImpl dispatched;
protected CountStatisticImpl duplicateFromStore;
protected CountStatisticImpl inflight;
protected CountStatisticImpl expired;
protected TimeStatisticImpl processTime;
protected CountStatisticImpl blockedSends;
protected TimeStatisticImpl blockedTime;
protected SizeStatisticImpl messageSize;
protected CountStatisticImpl maxUncommittedExceededCount;

// [AMQ-9437] Advanced Network Statistics are optionally enabled
protected final AtomicBoolean advancedNetworkStatisticsEnabled = new AtomicBoolean(false);
protected CountStatisticImpl networkEnqueues;
protected CountStatisticImpl networkDequeues;

// [AMQ-8463] Advanced Message Statistics are optionally enabled
protected final CountStatisticImpl dispatched;
protected final CountStatisticImpl duplicateFromStore;
protected final CountStatisticImpl inflight;
protected final CountStatisticImpl expired;
protected final TimeStatisticImpl processTime;
protected final CountStatisticImpl blockedSends;
protected final TimeStatisticImpl blockedTime;
protected final SizeStatisticImpl messageSize;
protected final CountStatisticImpl maxUncommittedExceededCount;

// [AMQ-9437] Advanced Network Statistics
protected final CountStatisticImpl networkEnqueues;
protected final CountStatisticImpl networkDequeues;

// [AMQ-8463] Advanced Message Statistics are disabled by default
protected final AtomicBoolean advancedMessageStatisticsEnabled = new AtomicBoolean(false);
protected UnsampledStatisticImpl<Long> enqueuedMessageBrokerInTime;
protected UnsampledStatisticImpl<String> enqueuedMessageClientID;
protected UnsampledStatisticImpl<String> enqueuedMessageID;
protected UnsampledStatisticImpl<Long> enqueuedMessageTimestamp;
protected UnsampledStatisticImpl<Long> dequeuedMessageBrokerInTime;
protected UnsampledStatisticImpl<Long> dequeuedMessageBrokerOutTime;
protected UnsampledStatisticImpl<String> dequeuedMessageClientID;
protected UnsampledStatisticImpl<String> dequeuedMessageID;
protected UnsampledStatisticImpl<Long> dequeuedMessageTimestamp;
protected volatile MessageFlowStatsImpl messageFlowStats;

public DestinationStatistics() {

Expand All @@ -90,15 +75,11 @@ public DestinationStatistics() {
messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination");
maxUncommittedExceededCount = new CountStatisticImpl("maxUncommittedExceededCount", "number of times maxUncommittedCount has been exceeded");

addStatistics(Set.of(enqueues, dispatched, dequeues, duplicateFromStore, inflight, expired, consumers,
producers, messages, messagesCached, processTime, blockedSends, blockedTime, messageSize, maxUncommittedExceededCount));

if(advancedNetworkStatisticsEnabled.get()) {
enableAdvancedNetworkStatistics();
}
networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection");
networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection");

if(advancedMessageStatisticsEnabled.get()) {
enableAdvancedMessageStatistics();
messageFlowStats = new MessageFlowStatsImpl();
}
}

Expand Down Expand Up @@ -168,7 +149,6 @@ public CountStatisticImpl getMaxUncommittedExceededCount(){
return this.maxUncommittedExceededCount;
}

// FIXME: Ugh.. all these would need to be behind a lock as well
public CountStatisticImpl getNetworkEnqueues() {
return networkEnqueues;
}
Expand All @@ -177,40 +157,8 @@ public CountStatisticImpl getNetworkDequeues() {
return networkDequeues;
}

public UnsampledStatistic<Long> getEnqueuedMessageBrokerInTime() {
return enqueuedMessageBrokerInTime;
}

public UnsampledStatistic<String> getEnqueuedMessageClientID() {
return enqueuedMessageClientID;
}

public UnsampledStatistic<String> getEnqueuedMessageID() {
return enqueuedMessageID;
}

public UnsampledStatistic<Long> getEnqueuedMessageTimestamp() {
return enqueuedMessageTimestamp;
}

public UnsampledStatistic<Long> getDequeuedMessageBrokerInTime() {
return dequeuedMessageBrokerInTime;
}

public UnsampledStatistic<Long> getDequeuedMessageBrokerOutTime() {
return dequeuedMessageBrokerOutTime;
}

public UnsampledStatistic<String> getDequeuedMessageClientID() {
return dequeuedMessageClientID;
}

public UnsampledStatistic<String> getDequeuedMessageID() {
return dequeuedMessageID;
}

public UnsampledStatistic<Long> getDequeuedMessageTimestamp() {
return dequeuedMessageTimestamp;
public MessageFlowStats getMessageFlowStats() {
return messageFlowStats;
}

public void reset() {
Expand All @@ -227,22 +175,12 @@ public void reset() {
blockedTime.reset();
messageSize.reset();
maxUncommittedExceededCount.reset();
networkEnqueues.reset();
networkDequeues.reset();

if(advancedMessageStatisticsEnabled.get()) {
enqueuedMessageBrokerInTime.reset();
enqueuedMessageClientID.reset();
enqueuedMessageID.reset();
enqueuedMessageTimestamp.reset();
dequeuedMessageBrokerInTime.reset();
dequeuedMessageBrokerOutTime.reset();
dequeuedMessageClientID.reset();
dequeuedMessageID.reset();
dequeuedMessageTimestamp.reset();
}

if(advancedNetworkStatisticsEnabled.get()) {
networkEnqueues.reset();
networkDequeues.reset();
MessageFlowStatsImpl tmpMessageFlowStats = messageFlowStats;
if(advancedMessageStatisticsEnabled.get() && tmpMessageFlowStats != null) {
tmpMessageFlowStats.reset();
}
}
}
Expand Down Expand Up @@ -271,16 +209,10 @@ public void setEnabled(boolean enabled) {
networkDequeues.setEnabled(enabled);

// [AMQ-9437] Advanced Message Statistics
enqueuedMessageBrokerInTime.setEnabled(enabled);
enqueuedMessageClientID.setEnabled(enabled);
enqueuedMessageID.setEnabled(enabled);
enqueuedMessageTimestamp.setEnabled(enabled);
dequeuedMessageBrokerInTime.setEnabled(enabled);
dequeuedMessageBrokerOutTime.setEnabled(enabled);
dequeuedMessageClientID.setEnabled(enabled);
dequeuedMessageID.setEnabled(enabled);
dequeuedMessageTimestamp.setEnabled(enabled);

MessageFlowStatsImpl tmpMessageFlowStats = messageFlowStats;
if(tmpMessageFlowStats != null) {
tmpMessageFlowStats.setEnabled(enabled);
}
}

public void setParent(DestinationStatistics parent) {
Expand Down Expand Up @@ -327,68 +259,18 @@ public void setParent(DestinationStatistics parent) {
}
}

// FIXME: This needs to use a reentrant lock instead
public synchronized void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
synchronized(this.advancedMessageStatisticsEnabled) {
if(!this.advancedMessageStatisticsEnabled.getAndSet(advancedMessageStatisticsEnabled)) {
enableAdvancedMessageStatistics();
} else {
disableAdvancedMessageStatistics();
}
}
}

public synchronized void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
synchronized(this.advancedNetworkStatisticsEnabled) {
if(this.advancedNetworkStatisticsEnabled.getAndSet(advancedNetworkStatisticsEnabled)) {
enableAdvancedNetworkStatistics();
} else {
disableAdvancedNetworkStatistics();
}
if(advancedMessageStatisticsEnabled) {
this.messageFlowStats = new MessageFlowStatsImpl();
this.messageFlowStats.setEnabled(enabled);
this.advancedMessageStatisticsEnabled.set(advancedMessageStatisticsEnabled);
} else {
this.advancedMessageStatisticsEnabled.set(advancedMessageStatisticsEnabled);
this.messageFlowStats = null;
}
}

private void enableAdvancedMessageStatistics() {
enqueuedMessageBrokerInTime = new UnsampledStatisticImpl<>("enqueuedMessageBrokerInTime", "ms", "Broker in time (ms) of last enqueued message to the destination", Long.valueOf(0l));
enqueuedMessageClientID = new UnsampledStatisticImpl<>("enqueuedMessageClientID", "id", "ClientID of last enqueued message to the destination", null);
enqueuedMessageID = new UnsampledStatisticImpl<>("enqueuedMessageID", "id", "MessageID of last enqueued message to the destination", null);
enqueuedMessageTimestamp = new UnsampledStatisticImpl<>("enqueuedMessageTimestamp", "ms", "Message timestamp of last enqueued message to the destination", Long.valueOf(0l));

dequeuedMessageBrokerInTime = new UnsampledStatisticImpl<>("dequeuedMessageBrokerInTime", "ms", "Broker in time (ms) of last dequeued message to the destination", Long.valueOf(0l));
dequeuedMessageBrokerOutTime = new UnsampledStatisticImpl<>("dequeuedMessageBrokerOutTime", "ms", "Broker out time (ms) of last dequeued message to the destination", Long.valueOf(0l));
dequeuedMessageClientID = new UnsampledStatisticImpl<>("dequeuedMessageClientID", "id", "ClientID of last dequeued message to the destination", null);
dequeuedMessageID = new UnsampledStatisticImpl<>("dequeuedMessageID", "id", "MessageID of last dequeued message to the destination", null);
dequeuedMessageTimestamp = new UnsampledStatisticImpl<>("dequeuedMessageTimestamp", "ms", "Message timestamp of last dequeued message to the destination", Long.valueOf(0l));

addStatistics(Set.of(enqueuedMessageBrokerInTime, enqueuedMessageClientID, enqueuedMessageID, enqueuedMessageTimestamp,
dequeuedMessageBrokerInTime, dequeuedMessageBrokerOutTime, dequeuedMessageClientID, dequeuedMessageID, dequeuedMessageTimestamp));
}

private void disableAdvancedMessageStatistics() {
removeStatistics(Set.of(enqueuedMessageBrokerInTime, enqueuedMessageClientID, enqueuedMessageID, enqueuedMessageTimestamp,
dequeuedMessageBrokerInTime, dequeuedMessageBrokerOutTime, dequeuedMessageClientID, dequeuedMessageID, dequeuedMessageTimestamp));

enqueuedMessageBrokerInTime = null;
enqueuedMessageClientID = null;
enqueuedMessageID = null;
enqueuedMessageTimestamp = null;

dequeuedMessageBrokerInTime = null;
dequeuedMessageBrokerOutTime = null;
dequeuedMessageClientID = null;
dequeuedMessageID = null;
dequeuedMessageTimestamp = null;
}

private void enableAdvancedNetworkStatistics() {
networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection");
networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection");
addStatistics(Set.of(networkEnqueues, networkDequeues));
}

private void disableAdvancedNetworkStatistics() {
removeStatistics(Set.of(networkEnqueues, networkDequeues));
networkEnqueues = null;
networkDequeues = null;
public synchronized boolean isAdvancedMessageStatisticsEnabled() {
return this.advancedMessageStatisticsEnabled.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.management.MessageFlowStats;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.SystemUsage;
Expand Down Expand Up @@ -374,12 +375,10 @@ public void afterRollback() throws Exception {
if(((Destination)node.getRegionDestination()).isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount());
}
if(((Destination)node.getRegionDestination()).isAdvancedMessageStatisticsEnabled()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageBrokerInTime().setValue(node.getMessage().getBrokerInTime());
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageBrokerOutTime().setValue(node.getMessage().getBrokerOutTime());
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageClientID().setValue(context.getClientId());
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageID().setValue(node.getMessageId().toString());
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageTimestamp().setValue(node.getMessage().getTimestamp());

MessageFlowStats tmpMessageFlowStats = ((Destination)node.getRegionDestination()).getDestinationStatistics().getMessageFlowStats();
if(((Destination)node.getRegionDestination()).isAdvancedMessageStatisticsEnabled() && tmpMessageFlowStats != null) {
tmpMessageFlowStats.dequeueStats(context.getClientId(), node.getMessageId().toString(), node.getMessage().getTimestamp(), node.getMessage().getBrokerInTime(), node.getMessage().getBrokerOutTime());
}
}
}
Expand Down
Loading

0 comments on commit df08462

Please sign in to comment.