Skip to content

Commit

Permalink
[AMQ-8463] New feature: advancedMessageStatistics
Browse files Browse the repository at this point in the history
  • Loading branch information
mattrpav committed Oct 25, 2024
1 parent 7a86e85 commit 43c80c2
Show file tree
Hide file tree
Showing 20 changed files with 616 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -620,4 +620,54 @@ public long getNetworkDequeues() {
return destination.getDestinationStatistics().getNetworkDequeues().getCount();
}

@Override
public boolean isAdvancedMessageStatisticsEnabled() {
return destination.isAdvancedMessageStatisticsEnabled();
}

@Override
public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
destination.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
}

@Override
public long getEnqueuedMessageBrokerInTime() {
return destination.getDestinationStatistics().getEnqueuedMessageBrokerInTime().getValue();
}

@Override
public String getEnqueuedMessageClientId() {
return destination.getDestinationStatistics().getEnqueuedMessageClientID().getValue();
}

@Override
public String getEnqueuedMessageId() {
return destination.getDestinationStatistics().getEnqueuedMessageID().getValue();
}

@Override
public long getEnqueuedMessageTimestamp() {
return destination.getDestinationStatistics().getEnqueuedMessageTimestamp().getValue();
}

@Override
public long getDequeuedMessageBrokerInTime() {
return destination.getDestinationStatistics().getDequeuedMessageBrokerInTime().getValue();
}

@Override
public String getDequeuedMessageClientId() {
return destination.getDestinationStatistics().getDequeuedMessageClientID().getValue();
}

@Override
public String getDequeuedMessageId() {
return destination.getDestinationStatistics().getDequeuedMessageID().getValue();
}

@Override
public long getDequeuedMessageTimestamp() {
return destination.getDestinationStatistics().getDequeuedMessageTimestamp().getValue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -493,4 +493,34 @@ public String sendTextMessageWithProperties(@MBeanInfo("properties") String prop

@MBeanInfo("Number of messages acknowledged from the destination via network connection")
long getNetworkDequeues();

@MBeanInfo("Query Advanced Message Statistics flag")
boolean isAdvancedMessageStatisticsEnabled();

@MBeanInfo("Toggle Advanced Message Statistics flag")
void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled);

@MBeanInfo("Broker in time (ms) of last enqueued message to the destination")
long getEnqueuedMessageBrokerInTime();

@MBeanInfo("ClientID of last enqueued message to the destination")
String getEnqueuedMessageClientId();

@MBeanInfo("MessageID of last enqueued message to the destination")
String getEnqueuedMessageId();

@MBeanInfo("Message timestamp in (ms) of last enqueued message to the destination")
long getEnqueuedMessageTimestamp();

@MBeanInfo("Broker in time (ms) of last dequeued message to the destination")
long getDequeuedMessageBrokerInTime();

@MBeanInfo("ClientID of last dequeued message to the destination")
String getDequeuedMessageClientId();

@MBeanInfo("MessageID of last dequeued message to the destination")
String getDequeuedMessageId();

@MBeanInfo("Message timestamp in (ms) of last dequeued message to the destination")
long getDequeuedMessageTimestamp();
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public abstract class BaseDestination implements Destination {
private boolean disposed = false;
private boolean doOptimzeMessageStorage = true;
private boolean advancedNetworkStatisticsEnabled = false;
private boolean advancedMessageStatisticsEnabled = false;

/*
* percentage of in-flight messages above which optimize message store is disabled
Expand Down Expand Up @@ -880,6 +881,16 @@ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatistic
this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled;
}

@Override
public boolean isAdvancedMessageStatisticsEnabled() {
return this.advancedMessageStatisticsEnabled;
}

@Override
public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
this.advancedMessageStatisticsEnabled = advancedMessageStatisticsEnabled;
}

@Override
public abstract List<Subscription> getConsumers();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,9 @@ public interface Destination extends Service, Task, Message.MessageDestination {

void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled);

// [AMQ-8463]
boolean isAdvancedMessageStatisticsEnabled();

void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled);

}
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,16 @@ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatistic
next.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled);
}

@Override
public boolean isAdvancedMessageStatisticsEnabled() {
return next.isAdvancedMessageStatisticsEnabled();
}

@Override
public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
next.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
}

public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
if (next instanceof DestinationFilter) {
DestinationFilter filter = (DestinationFilter) next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ public class DestinationStatistics extends StatsImpl {
protected CountStatisticImpl networkEnqueues;
protected CountStatisticImpl networkDequeues;

// [AMQ-8463] Advanced Message Statistics are optionally enabled
protected LongStatisticImpl enqueuedMessageBrokerInTime;
protected StringStatisticImpl enqueuedMessageClientID;
protected StringStatisticImpl enqueuedMessageID;
protected LongStatisticImpl enqueuedMessageTimestamp;
protected LongStatisticImpl dequeuedMessageBrokerInTime;
protected StringStatisticImpl dequeuedMessageClientID;
protected StringStatisticImpl dequeuedMessageID;
protected LongStatisticImpl dequeuedMessageTimestamp;

public DestinationStatistics() {

enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
Expand All @@ -76,6 +86,16 @@ public DestinationStatistics() {
networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection");
networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection");

enqueuedMessageBrokerInTime = new LongStatisticImpl("enqueuedMessageBrokerInTime", "Broker in time (ms) of last enqueued message to the destination");
enqueuedMessageClientID = new StringStatisticImpl("enqueuedMessageClientID", "ClientID of last enqueued message to the destination");
enqueuedMessageID = new StringStatisticImpl("enqueuedMessageID", "MessageID of last enqueued message to the destination");
enqueuedMessageTimestamp = new LongStatisticImpl("enqueuedMessageTimestamp", "Message timestamp of last enqueued message to the destination");

dequeuedMessageBrokerInTime = new LongStatisticImpl("dequeuedMessageBrokerInTime", "Broker in time (ms) of last dequeued message to the destination");
dequeuedMessageClientID = new StringStatisticImpl("dequeuedMessageClientID", "ClientID of last dequeued message to the destination");
dequeuedMessageID = new StringStatisticImpl("dequeuedMessageID", "MessageID of last dequeued message to the destination");
dequeuedMessageTimestamp = new LongStatisticImpl("dequeuedMessageTimestamp", "Message timestamp of last dequeued message to the destination");

addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
Expand All @@ -94,6 +114,15 @@ public DestinationStatistics() {

addStatistic("networkEnqueues", networkEnqueues);
addStatistic("networkDequeues", networkDequeues);

addStatistic("enqueuedMessageBrokerInTime", enqueuedMessageBrokerInTime);
addStatistic("enqueuedMessageClientID", enqueuedMessageClientID);
addStatistic("enqueuedMessageID", enqueuedMessageID);
addStatistic("enqueuedMessageTimestamp", enqueuedMessageTimestamp);
addStatistic("dequeuedMessageBrokerInTime", dequeuedMessageBrokerInTime);
addStatistic("dequeuedMessageClientID", dequeuedMessageClientID);
addStatistic("dequeuedMessageID", dequeuedMessageID);
addStatistic("dequeuedMessageTimestamp", dequeuedMessageTimestamp);
}

public CountStatisticImpl getEnqueues() {
Expand Down Expand Up @@ -170,6 +199,38 @@ public CountStatisticImpl getNetworkDequeues() {
return networkDequeues;
}

public LongStatisticImpl getEnqueuedMessageBrokerInTime() {
return enqueuedMessageBrokerInTime;
}

public StringStatisticImpl getEnqueuedMessageClientID() {
return enqueuedMessageClientID;
}

public StringStatisticImpl getEnqueuedMessageID() {
return enqueuedMessageID;
}

public LongStatisticImpl getEnqueuedMessageTimestamp() {
return enqueuedMessageTimestamp;
}

public LongStatisticImpl getDequeuedMessageBrokerInTime() {
return dequeuedMessageBrokerInTime;
}

public StringStatisticImpl getDequeuedMessageClientID() {
return dequeuedMessageClientID;
}

public StringStatisticImpl getDequeuedMessageID() {
return dequeuedMessageID;
}

public LongStatisticImpl getDequeuedMessageTimestamp() {
return dequeuedMessageTimestamp;
}

public void reset() {
if (this.isDoReset()) {
super.reset();
Expand All @@ -186,6 +247,14 @@ public void reset() {
maxUncommittedExceededCount.reset();
networkEnqueues.reset();
networkDequeues.reset();
enqueuedMessageBrokerInTime.reset();
enqueuedMessageClientID.reset();
enqueuedMessageID.reset();
enqueuedMessageTimestamp.reset();
dequeuedMessageBrokerInTime.reset();
dequeuedMessageClientID.reset();
dequeuedMessageID.reset();
dequeuedMessageTimestamp.reset();
}
}

Expand All @@ -208,9 +277,20 @@ public void setEnabled(boolean enabled) {
messageSize.setEnabled(enabled);
maxUncommittedExceededCount.setEnabled(enabled);

// [AMQ-9437] Advanced Statistics
// [AMQ-9437] Advanced Network Statistics
networkEnqueues.setEnabled(enabled);
networkDequeues.setEnabled(enabled);

// [AMQ-9437] Advanced Message Statistics
enqueuedMessageBrokerInTime.setEnabled(enabled);
enqueuedMessageClientID.setEnabled(enabled);
enqueuedMessageID.setEnabled(enabled);
enqueuedMessageTimestamp.setEnabled(enabled);
dequeuedMessageBrokerInTime.setEnabled(enabled);
dequeuedMessageClientID.setEnabled(enabled);
dequeuedMessageID.setEnabled(enabled);
dequeuedMessageTimestamp.setEnabled(enabled);

}

public void setParent(DestinationStatistics parent) {
Expand All @@ -233,6 +313,7 @@ public void setParent(DestinationStatistics parent) {
maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount);
networkEnqueues.setParent(parent.networkEnqueues);
networkDequeues.setParent(parent.networkDequeues);
// [AMQ-9437] Advanced Message Statistics do not parent.
} else {
enqueues.setParent(null);
dispatched.setParent(null);
Expand All @@ -252,6 +333,7 @@ public void setParent(DestinationStatistics parent) {
maxUncommittedExceededCount.setParent(null);
networkEnqueues.setParent(null);
networkDequeues.setParent(null);
// [AMQ-9437] Advanced Message Statistics do not parent.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1916,6 +1916,13 @@ private void dropMessage(ConnectionContext context, QueueMessageReference refere
getDestinationStatistics().getDequeues().increment();
getDestinationStatistics().getMessages().decrement();

if(isAdvancedMessageStatisticsEnabled()) {
getDestinationStatistics().getDequeuedMessageBrokerInTime().setValue(reference.getMessage().getBrokerInTime());
getDestinationStatistics().getDequeuedMessageClientID().setValue(context.getClientId());
getDestinationStatistics().getDequeuedMessageID().setValue(reference.getMessageId().toString());
getDestinationStatistics().getDequeuedMessageTimestamp().setValue(reference.getMessage().getTimestamp());
}

if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) {
getDestinationStatistics().getNetworkDequeues().increment();
}
Expand Down Expand Up @@ -1975,6 +1982,13 @@ final void messageSent(final ConnectionContext context, final Message msg) throw
destinationStatistics.getMessages().increment();
destinationStatistics.getMessageSize().addSize(msg.getSize());

if(isAdvancedMessageStatisticsEnabled()) {
destinationStatistics.getEnqueuedMessageBrokerInTime().setValue(msg.getBrokerInTime());
destinationStatistics.getEnqueuedMessageClientID().setValue(context.getClientId());
destinationStatistics.getEnqueuedMessageID().setValue(msg.getMessageId().toString());
destinationStatistics.getEnqueuedMessageTimestamp().setValue(msg.getTimestamp());
}

if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) {
destinationStatistics.getNetworkEnqueues().increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,13 @@ protected void dispatch(final ConnectionContext context, Message message) throws
// destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();

if(isAdvancedMessageStatisticsEnabled()) {
destinationStatistics.getEnqueuedMessageBrokerInTime().setValue(message.getBrokerInTime());
destinationStatistics.getEnqueuedMessageClientID().setValue(context.getClientId());
destinationStatistics.getEnqueuedMessageID().setValue(message.getMessageId().toString());
destinationStatistics.getEnqueuedMessageTimestamp().setValue(message.getTimestamp());
}

if(isAdvancedNetworkStatisticsEnabled() && context != null && context.isNetworkConnection()) {
destinationStatistics.getNetworkEnqueues().increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public class PolicyEntry extends DestinationMapEntry {
private int maxDestinations = -1;
private boolean useTopicSubscriptionInflightStats = true;
private boolean advancedNetworkStatisticsEnabled = false; // [AMQ-9437]
private boolean advancedMessageStatisticsEnabled = false; // [AMQ-8463]

/*
* percentage of in-flight messages above which optimize message store is disabled
*/
Expand Down Expand Up @@ -309,6 +311,9 @@ public void baseUpdate(BaseDestination destination, Set<String> includedProperti
if (isUpdate("advancedNetworkStatisticsEnabled", includedProperties)) {
destination.setAdvancedNetworkStatisticsEnabled(isAdvancedNetworkStatisticsEnabled());
}
if (isUpdate("advancedMessageStatisticsEnabled", includedProperties)) {
destination.setAdvancedMessageStatisticsEnabled(isAdvancedMessageStatisticsEnabled());
}
}

public void baseConfiguration(Broker broker, BaseDestination destination) {
Expand Down Expand Up @@ -1187,4 +1192,12 @@ public boolean isAdvancedNetworkStatisticsEnabled() {
public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled;
}

public boolean isAdvancedMessageStatisticsEnabled() {
return this.advancedMessageStatisticsEnabled;
}

public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
this.advancedMessageStatisticsEnabled = advancedMessageStatisticsEnabled;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.activemq.management;

/**
* A statistic to store a single long value that is not incremented
*
* Example: Store a timestamp value of a recent message
*
*/
public interface LongStatistic extends UnsampledStatistic {
public Long getValue();
}
Loading

0 comments on commit 43c80c2

Please sign in to comment.