Skip to content

Commit

Permalink
[AMQ-8463] New feature: Destination advancedMessageStatistics
Browse files Browse the repository at this point in the history
  • Loading branch information
mattrpav committed Nov 18, 2024
1 parent 3400983 commit 583dce9
Show file tree
Hide file tree
Showing 25 changed files with 753 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -620,4 +620,54 @@ public long getNetworkDequeues() {
return destination.getDestinationStatistics().getNetworkDequeues().getCount();
}

@Override
public boolean isAdvancedMessageStatisticsEnabled() {
return destination.isAdvancedMessageStatisticsEnabled();
}

@Override
public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
destination.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
}

@Override
public long getEnqueuedMessageBrokerInTime() {
return destination.getDestinationStatistics().getEnqueuedMessageBrokerInTime().getValue();
}

@Override
public String getEnqueuedMessageClientId() {
return destination.getDestinationStatistics().getEnqueuedMessageClientID().getValue();
}

@Override
public String getEnqueuedMessageId() {
return destination.getDestinationStatistics().getEnqueuedMessageID().getValue();
}

@Override
public long getEnqueuedMessageTimestamp() {
return destination.getDestinationStatistics().getEnqueuedMessageTimestamp().getValue();
}

@Override
public long getDequeuedMessageBrokerInTime() {
return destination.getDestinationStatistics().getDequeuedMessageBrokerInTime().getValue();
}

@Override
public String getDequeuedMessageClientId() {
return destination.getDestinationStatistics().getDequeuedMessageClientID().getValue();
}

@Override
public String getDequeuedMessageId() {
return destination.getDestinationStatistics().getDequeuedMessageID().getValue();
}

@Override
public long getDequeuedMessageTimestamp() {
return destination.getDestinationStatistics().getDequeuedMessageTimestamp().getValue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -493,4 +493,34 @@ public String sendTextMessageWithProperties(@MBeanInfo("properties") String prop

@MBeanInfo("Number of messages acknowledged from the destination via network connection")
long getNetworkDequeues();

@MBeanInfo("Query Advanced Message Statistics flag")
boolean isAdvancedMessageStatisticsEnabled();

@MBeanInfo("Toggle Advanced Message Statistics flag")
void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled);

@MBeanInfo("Broker in time (ms) of last enqueued message to the destination")
long getEnqueuedMessageBrokerInTime();

@MBeanInfo("ClientID of last enqueued message to the destination")
String getEnqueuedMessageClientId();

@MBeanInfo("MessageID of last enqueued message to the destination")
String getEnqueuedMessageId();

@MBeanInfo("Message timestamp in (ms) of last enqueued message to the destination")
long getEnqueuedMessageTimestamp();

@MBeanInfo("Broker in time (ms) of last dequeued message to the destination")
long getDequeuedMessageBrokerInTime();

@MBeanInfo("ClientID of last dequeued message to the destination")
String getDequeuedMessageClientId();

@MBeanInfo("MessageID of last dequeued message to the destination")
String getDequeuedMessageId();

@MBeanInfo("Message timestamp in (ms) of last dequeued message to the destination")
long getDequeuedMessageTimestamp();
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public abstract class BaseDestination implements Destination {
private boolean disposed = false;
private boolean doOptimzeMessageStorage = true;
private boolean advancedNetworkStatisticsEnabled = false;
private boolean advancedMessageStatisticsEnabled = false;

/*
* percentage of in-flight messages above which optimize message store is disabled
Expand Down Expand Up @@ -880,6 +881,16 @@ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatistic
this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled;
}

@Override
public boolean isAdvancedMessageStatisticsEnabled() {
return this.advancedMessageStatisticsEnabled;
}

@Override
public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
this.advancedMessageStatisticsEnabled = advancedMessageStatisticsEnabled;
}

@Override
public abstract List<Subscription> getConsumers();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,9 @@ public interface Destination extends Service, Task, Message.MessageDestination {

void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled);

// [AMQ-8463]
boolean isAdvancedMessageStatisticsEnabled();

void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled);

}
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,16 @@ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatistic
next.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled);
}

@Override
public boolean isAdvancedMessageStatisticsEnabled() {
return next.isAdvancedMessageStatisticsEnabled();
}

@Override
public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
next.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
}

public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
if (next instanceof DestinationFilter) {
DestinationFilter filter = (DestinationFilter) next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ public class DestinationStatistics extends StatsImpl {
protected CountStatisticImpl networkEnqueues;
protected CountStatisticImpl networkDequeues;

// [AMQ-8463] Advanced Message Statistics are optionally enabled
protected LongStatisticImpl enqueuedMessageBrokerInTime;
protected StringStatisticImpl enqueuedMessageClientID;
protected StringStatisticImpl enqueuedMessageID;
protected LongStatisticImpl enqueuedMessageTimestamp;
protected LongStatisticImpl dequeuedMessageBrokerInTime;
protected StringStatisticImpl dequeuedMessageClientID;
protected StringStatisticImpl dequeuedMessageID;
protected LongStatisticImpl dequeuedMessageTimestamp;

public DestinationStatistics() {

enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
Expand All @@ -76,6 +86,16 @@ public DestinationStatistics() {
networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection");
networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection");

enqueuedMessageBrokerInTime = new LongStatisticImpl("enqueuedMessageBrokerInTime", "Broker in time (ms) of last enqueued message to the destination");
enqueuedMessageClientID = new StringStatisticImpl("enqueuedMessageClientID", "ClientID of last enqueued message to the destination");
enqueuedMessageID = new StringStatisticImpl("enqueuedMessageID", "MessageID of last enqueued message to the destination");
enqueuedMessageTimestamp = new LongStatisticImpl("enqueuedMessageTimestamp", "Message timestamp of last enqueued message to the destination");

dequeuedMessageBrokerInTime = new LongStatisticImpl("dequeuedMessageBrokerInTime", "Broker in time (ms) of last dequeued message to the destination");
dequeuedMessageClientID = new StringStatisticImpl("dequeuedMessageClientID", "ClientID of last dequeued message to the destination");
dequeuedMessageID = new StringStatisticImpl("dequeuedMessageID", "MessageID of last dequeued message to the destination");
dequeuedMessageTimestamp = new LongStatisticImpl("dequeuedMessageTimestamp", "Message timestamp of last dequeued message to the destination");

addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
Expand All @@ -94,6 +114,15 @@ public DestinationStatistics() {

addStatistic("networkEnqueues", networkEnqueues);
addStatistic("networkDequeues", networkDequeues);

addStatistic("enqueuedMessageBrokerInTime", enqueuedMessageBrokerInTime);
addStatistic("enqueuedMessageClientID", enqueuedMessageClientID);
addStatistic("enqueuedMessageID", enqueuedMessageID);
addStatistic("enqueuedMessageTimestamp", enqueuedMessageTimestamp);
addStatistic("dequeuedMessageBrokerInTime", dequeuedMessageBrokerInTime);
addStatistic("dequeuedMessageClientID", dequeuedMessageClientID);
addStatistic("dequeuedMessageID", dequeuedMessageID);
addStatistic("dequeuedMessageTimestamp", dequeuedMessageTimestamp);
}

public CountStatisticImpl getEnqueues() {
Expand Down Expand Up @@ -170,6 +199,38 @@ public CountStatisticImpl getNetworkDequeues() {
return networkDequeues;
}

public LongStatisticImpl getEnqueuedMessageBrokerInTime() {
return enqueuedMessageBrokerInTime;
}

public StringStatisticImpl getEnqueuedMessageClientID() {
return enqueuedMessageClientID;
}

public StringStatisticImpl getEnqueuedMessageID() {
return enqueuedMessageID;
}

public LongStatisticImpl getEnqueuedMessageTimestamp() {
return enqueuedMessageTimestamp;
}

public LongStatisticImpl getDequeuedMessageBrokerInTime() {
return dequeuedMessageBrokerInTime;
}

public StringStatisticImpl getDequeuedMessageClientID() {
return dequeuedMessageClientID;
}

public StringStatisticImpl getDequeuedMessageID() {
return dequeuedMessageID;
}

public LongStatisticImpl getDequeuedMessageTimestamp() {
return dequeuedMessageTimestamp;
}

public void reset() {
if (this.isDoReset()) {
super.reset();
Expand All @@ -186,6 +247,14 @@ public void reset() {
maxUncommittedExceededCount.reset();
networkEnqueues.reset();
networkDequeues.reset();
enqueuedMessageBrokerInTime.reset();
enqueuedMessageClientID.reset();
enqueuedMessageID.reset();
enqueuedMessageTimestamp.reset();
dequeuedMessageBrokerInTime.reset();
dequeuedMessageClientID.reset();
dequeuedMessageID.reset();
dequeuedMessageTimestamp.reset();
}
}

Expand All @@ -208,9 +277,20 @@ public void setEnabled(boolean enabled) {
messageSize.setEnabled(enabled);
maxUncommittedExceededCount.setEnabled(enabled);

// [AMQ-9437] Advanced Statistics
// [AMQ-9437] Advanced Network Statistics
networkEnqueues.setEnabled(enabled);
networkDequeues.setEnabled(enabled);

// [AMQ-9437] Advanced Message Statistics
enqueuedMessageBrokerInTime.setEnabled(enabled);
enqueuedMessageClientID.setEnabled(enabled);
enqueuedMessageID.setEnabled(enabled);
enqueuedMessageTimestamp.setEnabled(enabled);
dequeuedMessageBrokerInTime.setEnabled(enabled);
dequeuedMessageClientID.setEnabled(enabled);
dequeuedMessageID.setEnabled(enabled);
dequeuedMessageTimestamp.setEnabled(enabled);

}

public void setParent(DestinationStatistics parent) {
Expand All @@ -233,6 +313,7 @@ public void setParent(DestinationStatistics parent) {
maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount);
networkEnqueues.setParent(parent.networkEnqueues);
networkDequeues.setParent(parent.networkDequeues);
// [AMQ-9437] Advanced Message Statistics do not parent.
} else {
enqueues.setParent(null);
dispatched.setParent(null);
Expand All @@ -252,6 +333,7 @@ public void setParent(DestinationStatistics parent) {
maxUncommittedExceededCount.setParent(null);
networkEnqueues.setParent(null);
networkDequeues.setParent(null);
// [AMQ-9437] Advanced Message Statistics do not parent.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,12 @@ public void afterRollback() throws Exception {
if(((Destination)node.getRegionDestination()).isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount());
}
if(((Destination)node.getRegionDestination()).isAdvancedMessageStatisticsEnabled()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageBrokerInTime().setValue(node.getMessage().getBrokerInTime());
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageClientID().setValue(context.getClientId());
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageID().setValue(node.getMessageId().toString());
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageTimestamp().setValue(node.getMessage().getTimestamp());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1916,6 +1916,13 @@ private void dropMessage(ConnectionContext context, QueueMessageReference refere
getDestinationStatistics().getDequeues().increment();
getDestinationStatistics().getMessages().decrement();

if(isAdvancedMessageStatisticsEnabled()) {
getDestinationStatistics().getDequeuedMessageBrokerInTime().setValue(reference.getMessage().getBrokerInTime());
getDestinationStatistics().getDequeuedMessageClientID().setValue(context.getClientId());
getDestinationStatistics().getDequeuedMessageID().setValue(reference.getMessageId().toString());
getDestinationStatistics().getDequeuedMessageTimestamp().setValue(reference.getMessage().getTimestamp());
}

if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) {
getDestinationStatistics().getNetworkDequeues().increment();
}
Expand Down Expand Up @@ -1975,6 +1982,13 @@ final void messageSent(final ConnectionContext context, final Message msg) throw
destinationStatistics.getMessages().increment();
destinationStatistics.getMessageSize().addSize(msg.getSize());

if(isAdvancedMessageStatisticsEnabled()) {
destinationStatistics.getEnqueuedMessageBrokerInTime().setValue(msg.getBrokerInTime());
destinationStatistics.getEnqueuedMessageClientID().setValue(context.getClientId());
destinationStatistics.getEnqueuedMessageID().setValue(msg.getMessageId().toString());
destinationStatistics.getEnqueuedMessageTimestamp().setValue(msg.getTimestamp());
}

if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) {
destinationStatistics.getNetworkEnqueues().increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,13 @@ protected void dispatch(final ConnectionContext context, Message message) throws
// destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();

if(isAdvancedMessageStatisticsEnabled()) {
destinationStatistics.getEnqueuedMessageBrokerInTime().setValue(message.getBrokerInTime());
destinationStatistics.getEnqueuedMessageClientID().setValue(context.getClientId());
destinationStatistics.getEnqueuedMessageID().setValue(message.getMessageId().toString());
destinationStatistics.getEnqueuedMessageTimestamp().setValue(message.getTimestamp());
}

if(isAdvancedNetworkStatisticsEnabled() && context != null && context.isNetworkConnection()) {
destinationStatistics.getNetworkEnqueues().increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,10 @@ private void incrementStatsOnAck(final Destination destination, final MessageAck
destination.getDestinationStatistics().getNetworkDequeues().add(count);
}
}
if(destination.isAdvancedMessageStatisticsEnabled()) {
destination.getDestinationStatistics().getDequeuedMessageClientID().setValue(context.getClientId());
destination.getDestinationStatistics().getDequeuedMessageID().setValue(ack.getLastMessageId().toString());
}
if (ack.isExpiredAck()) {
destination.getDestinationStatistics().getExpired().add(count);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public class PolicyEntry extends DestinationMapEntry {
private int maxDestinations = -1;
private boolean useTopicSubscriptionInflightStats = true;
private boolean advancedNetworkStatisticsEnabled = false; // [AMQ-9437]
private boolean advancedMessageStatisticsEnabled = false; // [AMQ-8463]

/*
* percentage of in-flight messages above which optimize message store is disabled
*/
Expand Down Expand Up @@ -309,6 +311,9 @@ public void baseUpdate(BaseDestination destination, Set<String> includedProperti
if (isUpdate("advancedNetworkStatisticsEnabled", includedProperties)) {
destination.setAdvancedNetworkStatisticsEnabled(isAdvancedNetworkStatisticsEnabled());
}
if (isUpdate("advancedMessageStatisticsEnabled", includedProperties)) {
destination.setAdvancedMessageStatisticsEnabled(isAdvancedMessageStatisticsEnabled());
}
}

public void baseConfiguration(Broker broker, BaseDestination destination) {
Expand Down Expand Up @@ -1187,4 +1192,12 @@ public boolean isAdvancedNetworkStatisticsEnabled() {
public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled;
}

public boolean isAdvancedMessageStatisticsEnabled() {
return this.advancedMessageStatisticsEnabled;
}

public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
this.advancedMessageStatisticsEnabled = advancedMessageStatisticsEnabled;
}
}
Loading

0 comments on commit 583dce9

Please sign in to comment.