From df0846200acf8cd7a967340487f6311124f89076 Mon Sep 17 00:00:00 2001 From: Matt Pavlovich Date: Thu, 12 Dec 2024 13:47:22 -0600 Subject: [PATCH] fixes --- .../activemq/broker/jmx/DestinationView.java | 29 ++- .../broker/region/BaseDestination.java | 1 - .../broker/region/DestinationStatistics.java | 210 ++++-------------- .../region/DurableTopicSubscription.java | 11 +- .../apache/activemq/broker/region/Queue.java | 27 ++- .../apache/activemq/broker/region/Topic.java | 9 +- .../broker/region/TopicSubscription.java | 8 +- .../activemq/management/MessageFlowStats.java | 32 +++ .../management/MessageFlowStatsImpl.java | 118 ++++++++++ .../apache/activemq/management/StatsImpl.java | 2 +- .../management/UnsampledStatisticImpl.java | 8 +- .../management/UnsampledStatsImpl.java | 41 ++++ .../NetworkAdvancedStatisticsTest.java | 61 ++--- 13 files changed, 321 insertions(+), 236 deletions(-) create mode 100644 activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStats.java create mode 100644 activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStatsImpl.java create mode 100644 activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatsImpl.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 763c4cf323..58e936844f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -52,6 +52,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; +import org.apache.activemq.management.MessageFlowStats; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.store.MessageStore; import org.apache.activemq.util.URISupport; @@ -632,47 +633,55 @@ public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatistic @Override public long getEnqueuedMessageBrokerInTime() { - return destination.getDestinationStatistics().getEnqueuedMessageBrokerInTime().getValue(); + MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats(); + return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getEnqueuedMessageBrokerInTime().getValue() : 0l); } @Override public String getEnqueuedMessageClientId() { - return destination.getDestinationStatistics().getEnqueuedMessageClientID().getValue(); + MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats(); + return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getEnqueuedMessageClientID().getValue() : null); } @Override public String getEnqueuedMessageId() { - return destination.getDestinationStatistics().getEnqueuedMessageID().getValue(); + MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats(); + return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getEnqueuedMessageID().getValue() : null); } @Override public long getEnqueuedMessageTimestamp() { - return destination.getDestinationStatistics().getEnqueuedMessageTimestamp().getValue(); + MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats(); + return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getEnqueuedMessageTimestamp().getValue() : 0l); } @Override public long getDequeuedMessageBrokerInTime() { - return destination.getDestinationStatistics().getDequeuedMessageBrokerInTime().getValue(); + MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats(); + return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageBrokerInTime().getValue() : 0l); } @Override public long getDequeuedMessageBrokerOutTime() { - return destination.getDestinationStatistics().getDequeuedMessageBrokerOutTime().getValue(); + MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats(); + return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageBrokerOutTime().getValue() : 0l); } @Override public String getDequeuedMessageClientId() { - return destination.getDestinationStatistics().getDequeuedMessageClientID().getValue(); + MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats(); + return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageClientID().getValue() : null); } @Override public String getDequeuedMessageId() { - return destination.getDestinationStatistics().getDequeuedMessageID().getValue(); + MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats(); + return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageID().getValue() : null); } @Override public long getDequeuedMessageTimestamp() { - return destination.getDestinationStatistics().getDequeuedMessageTimestamp().getValue(); + MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats(); + return (tmpMessageFlowStats != null ? tmpMessageFlowStats.getDequeuedMessageTimestamp().getValue() : 0l); } - } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 49fd17be43..a82e660e11 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -879,7 +879,6 @@ public boolean isAdvancedNetworkStatisticsEnabled() { @Override public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) { this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled; - this.destinationStatistics.setAdvancedStatisticsEnabled(advancedNetworkStatisticsEnabled); } @Override diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java index 2b5335f458..b3918fd2a2 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java @@ -14,58 +14,43 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.activemq.broker.region; import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.management.PollCountStatisticImpl; import org.apache.activemq.management.StatsImpl; - -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.activemq.management.*; /** - * The J2EE Statistics for the a Destination. - * - * + * The Statistics for a Destination. */ public class DestinationStatistics extends StatsImpl { - protected CountStatisticImpl enqueues; - protected CountStatisticImpl dequeues; - protected CountStatisticImpl forwards; - protected CountStatisticImpl consumers; - protected CountStatisticImpl producers; - protected CountStatisticImpl messages; + protected final CountStatisticImpl enqueues; + protected final CountStatisticImpl dequeues; + protected final CountStatisticImpl forwards; + protected final CountStatisticImpl consumers; + protected final CountStatisticImpl producers; + protected final CountStatisticImpl messages; protected PollCountStatisticImpl messagesCached; - protected CountStatisticImpl dispatched; - protected CountStatisticImpl duplicateFromStore; - protected CountStatisticImpl inflight; - protected CountStatisticImpl expired; - protected TimeStatisticImpl processTime; - protected CountStatisticImpl blockedSends; - protected TimeStatisticImpl blockedTime; - protected SizeStatisticImpl messageSize; - protected CountStatisticImpl maxUncommittedExceededCount; - - // [AMQ-9437] Advanced Network Statistics are optionally enabled - protected final AtomicBoolean advancedNetworkStatisticsEnabled = new AtomicBoolean(false); - protected CountStatisticImpl networkEnqueues; - protected CountStatisticImpl networkDequeues; - - // [AMQ-8463] Advanced Message Statistics are optionally enabled + protected final CountStatisticImpl dispatched; + protected final CountStatisticImpl duplicateFromStore; + protected final CountStatisticImpl inflight; + protected final CountStatisticImpl expired; + protected final TimeStatisticImpl processTime; + protected final CountStatisticImpl blockedSends; + protected final TimeStatisticImpl blockedTime; + protected final SizeStatisticImpl messageSize; + protected final CountStatisticImpl maxUncommittedExceededCount; + + // [AMQ-9437] Advanced Network Statistics + protected final CountStatisticImpl networkEnqueues; + protected final CountStatisticImpl networkDequeues; + + // [AMQ-8463] Advanced Message Statistics are disabled by default protected final AtomicBoolean advancedMessageStatisticsEnabled = new AtomicBoolean(false); - protected UnsampledStatisticImpl enqueuedMessageBrokerInTime; - protected UnsampledStatisticImpl enqueuedMessageClientID; - protected UnsampledStatisticImpl enqueuedMessageID; - protected UnsampledStatisticImpl enqueuedMessageTimestamp; - protected UnsampledStatisticImpl dequeuedMessageBrokerInTime; - protected UnsampledStatisticImpl dequeuedMessageBrokerOutTime; - protected UnsampledStatisticImpl dequeuedMessageClientID; - protected UnsampledStatisticImpl dequeuedMessageID; - protected UnsampledStatisticImpl dequeuedMessageTimestamp; + protected volatile MessageFlowStatsImpl messageFlowStats; public DestinationStatistics() { @@ -90,15 +75,11 @@ public DestinationStatistics() { messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination"); maxUncommittedExceededCount = new CountStatisticImpl("maxUncommittedExceededCount", "number of times maxUncommittedCount has been exceeded"); - addStatistics(Set.of(enqueues, dispatched, dequeues, duplicateFromStore, inflight, expired, consumers, - producers, messages, messagesCached, processTime, blockedSends, blockedTime, messageSize, maxUncommittedExceededCount)); - - if(advancedNetworkStatisticsEnabled.get()) { - enableAdvancedNetworkStatistics(); - } + networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection"); + networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection"); if(advancedMessageStatisticsEnabled.get()) { - enableAdvancedMessageStatistics(); + messageFlowStats = new MessageFlowStatsImpl(); } } @@ -168,7 +149,6 @@ public CountStatisticImpl getMaxUncommittedExceededCount(){ return this.maxUncommittedExceededCount; } - // FIXME: Ugh.. all these would need to be behind a lock as well public CountStatisticImpl getNetworkEnqueues() { return networkEnqueues; } @@ -177,40 +157,8 @@ public CountStatisticImpl getNetworkDequeues() { return networkDequeues; } - public UnsampledStatistic getEnqueuedMessageBrokerInTime() { - return enqueuedMessageBrokerInTime; - } - - public UnsampledStatistic getEnqueuedMessageClientID() { - return enqueuedMessageClientID; - } - - public UnsampledStatistic getEnqueuedMessageID() { - return enqueuedMessageID; - } - - public UnsampledStatistic getEnqueuedMessageTimestamp() { - return enqueuedMessageTimestamp; - } - - public UnsampledStatistic getDequeuedMessageBrokerInTime() { - return dequeuedMessageBrokerInTime; - } - - public UnsampledStatistic getDequeuedMessageBrokerOutTime() { - return dequeuedMessageBrokerOutTime; - } - - public UnsampledStatistic getDequeuedMessageClientID() { - return dequeuedMessageClientID; - } - - public UnsampledStatistic getDequeuedMessageID() { - return dequeuedMessageID; - } - - public UnsampledStatistic getDequeuedMessageTimestamp() { - return dequeuedMessageTimestamp; + public MessageFlowStats getMessageFlowStats() { + return messageFlowStats; } public void reset() { @@ -227,22 +175,12 @@ public void reset() { blockedTime.reset(); messageSize.reset(); maxUncommittedExceededCount.reset(); + networkEnqueues.reset(); + networkDequeues.reset(); - if(advancedMessageStatisticsEnabled.get()) { - enqueuedMessageBrokerInTime.reset(); - enqueuedMessageClientID.reset(); - enqueuedMessageID.reset(); - enqueuedMessageTimestamp.reset(); - dequeuedMessageBrokerInTime.reset(); - dequeuedMessageBrokerOutTime.reset(); - dequeuedMessageClientID.reset(); - dequeuedMessageID.reset(); - dequeuedMessageTimestamp.reset(); - } - - if(advancedNetworkStatisticsEnabled.get()) { - networkEnqueues.reset(); - networkDequeues.reset(); + MessageFlowStatsImpl tmpMessageFlowStats = messageFlowStats; + if(advancedMessageStatisticsEnabled.get() && tmpMessageFlowStats != null) { + tmpMessageFlowStats.reset(); } } } @@ -271,16 +209,10 @@ public void setEnabled(boolean enabled) { networkDequeues.setEnabled(enabled); // [AMQ-9437] Advanced Message Statistics - enqueuedMessageBrokerInTime.setEnabled(enabled); - enqueuedMessageClientID.setEnabled(enabled); - enqueuedMessageID.setEnabled(enabled); - enqueuedMessageTimestamp.setEnabled(enabled); - dequeuedMessageBrokerInTime.setEnabled(enabled); - dequeuedMessageBrokerOutTime.setEnabled(enabled); - dequeuedMessageClientID.setEnabled(enabled); - dequeuedMessageID.setEnabled(enabled); - dequeuedMessageTimestamp.setEnabled(enabled); - + MessageFlowStatsImpl tmpMessageFlowStats = messageFlowStats; + if(tmpMessageFlowStats != null) { + tmpMessageFlowStats.setEnabled(enabled); + } } public void setParent(DestinationStatistics parent) { @@ -327,68 +259,18 @@ public void setParent(DestinationStatistics parent) { } } - // FIXME: This needs to use a reentrant lock instead public synchronized void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) { - synchronized(this.advancedMessageStatisticsEnabled) { - if(!this.advancedMessageStatisticsEnabled.getAndSet(advancedMessageStatisticsEnabled)) { - enableAdvancedMessageStatistics(); - } else { - disableAdvancedMessageStatistics(); - } - } - } - - public synchronized void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) { - synchronized(this.advancedNetworkStatisticsEnabled) { - if(this.advancedNetworkStatisticsEnabled.getAndSet(advancedNetworkStatisticsEnabled)) { - enableAdvancedNetworkStatistics(); - } else { - disableAdvancedNetworkStatistics(); - } + if(advancedMessageStatisticsEnabled) { + this.messageFlowStats = new MessageFlowStatsImpl(); + this.messageFlowStats.setEnabled(enabled); + this.advancedMessageStatisticsEnabled.set(advancedMessageStatisticsEnabled); + } else { + this.advancedMessageStatisticsEnabled.set(advancedMessageStatisticsEnabled); + this.messageFlowStats = null; } } - private void enableAdvancedMessageStatistics() { - enqueuedMessageBrokerInTime = new UnsampledStatisticImpl<>("enqueuedMessageBrokerInTime", "ms", "Broker in time (ms) of last enqueued message to the destination", Long.valueOf(0l)); - enqueuedMessageClientID = new UnsampledStatisticImpl<>("enqueuedMessageClientID", "id", "ClientID of last enqueued message to the destination", null); - enqueuedMessageID = new UnsampledStatisticImpl<>("enqueuedMessageID", "id", "MessageID of last enqueued message to the destination", null); - enqueuedMessageTimestamp = new UnsampledStatisticImpl<>("enqueuedMessageTimestamp", "ms", "Message timestamp of last enqueued message to the destination", Long.valueOf(0l)); - - dequeuedMessageBrokerInTime = new UnsampledStatisticImpl<>("dequeuedMessageBrokerInTime", "ms", "Broker in time (ms) of last dequeued message to the destination", Long.valueOf(0l)); - dequeuedMessageBrokerOutTime = new UnsampledStatisticImpl<>("dequeuedMessageBrokerOutTime", "ms", "Broker out time (ms) of last dequeued message to the destination", Long.valueOf(0l)); - dequeuedMessageClientID = new UnsampledStatisticImpl<>("dequeuedMessageClientID", "id", "ClientID of last dequeued message to the destination", null); - dequeuedMessageID = new UnsampledStatisticImpl<>("dequeuedMessageID", "id", "MessageID of last dequeued message to the destination", null); - dequeuedMessageTimestamp = new UnsampledStatisticImpl<>("dequeuedMessageTimestamp", "ms", "Message timestamp of last dequeued message to the destination", Long.valueOf(0l)); - - addStatistics(Set.of(enqueuedMessageBrokerInTime, enqueuedMessageClientID, enqueuedMessageID, enqueuedMessageTimestamp, - dequeuedMessageBrokerInTime, dequeuedMessageBrokerOutTime, dequeuedMessageClientID, dequeuedMessageID, dequeuedMessageTimestamp)); - } - - private void disableAdvancedMessageStatistics() { - removeStatistics(Set.of(enqueuedMessageBrokerInTime, enqueuedMessageClientID, enqueuedMessageID, enqueuedMessageTimestamp, - dequeuedMessageBrokerInTime, dequeuedMessageBrokerOutTime, dequeuedMessageClientID, dequeuedMessageID, dequeuedMessageTimestamp)); - - enqueuedMessageBrokerInTime = null; - enqueuedMessageClientID = null; - enqueuedMessageID = null; - enqueuedMessageTimestamp = null; - - dequeuedMessageBrokerInTime = null; - dequeuedMessageBrokerOutTime = null; - dequeuedMessageClientID = null; - dequeuedMessageID = null; - dequeuedMessageTimestamp = null; - } - - private void enableAdvancedNetworkStatistics() { - networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection"); - networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection"); - addStatistics(Set.of(networkEnqueues, networkDequeues)); - } - - private void disableAdvancedNetworkStatistics() { - removeStatistics(Set.of(networkEnqueues, networkDequeues)); - networkEnqueues = null; - networkDequeues = null; + public synchronized boolean isAdvancedMessageStatisticsEnabled() { + return this.advancedMessageStatisticsEnabled.get(); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 1c8af2d2a0..2533bdfc77 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -42,6 +42,7 @@ import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.RemoveInfo; +import org.apache.activemq.management.MessageFlowStats; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.usage.SystemUsage; @@ -374,12 +375,10 @@ public void afterRollback() throws Exception { if(((Destination)node.getRegionDestination()).isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) { ((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount()); } - if(((Destination)node.getRegionDestination()).isAdvancedMessageStatisticsEnabled()) { - ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageBrokerInTime().setValue(node.getMessage().getBrokerInTime()); - ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageBrokerOutTime().setValue(node.getMessage().getBrokerOutTime()); - ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageClientID().setValue(context.getClientId()); - ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageID().setValue(node.getMessageId().toString()); - ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeuedMessageTimestamp().setValue(node.getMessage().getTimestamp()); + + MessageFlowStats tmpMessageFlowStats = ((Destination)node.getRegionDestination()).getDestinationStatistics().getMessageFlowStats(); + if(((Destination)node.getRegionDestination()).isAdvancedMessageStatisticsEnabled() && tmpMessageFlowStats != null) { + tmpMessageFlowStats.dequeueStats(context.getClientId(), node.getMessageId().toString(), node.getMessage().getTimestamp(), node.getMessage().getBrokerInTime(), node.getMessage().getBrokerOutTime()); } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index a40d3ad957..2ac2e73b90 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -85,6 +85,7 @@ import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; +import org.apache.activemq.management.MessageFlowStats; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.state.ProducerState; import org.apache.activemq.store.IndexListener; @@ -1913,19 +1914,17 @@ public void afterRollback() throws Exception { private void dropMessage(ConnectionContext context, QueueMessageReference reference) { //use dropIfLive so we only process the statistics at most one time if (reference.dropIfLive()) { - getDestinationStatistics().getDequeues().increment(); - getDestinationStatistics().getMessages().decrement(); + destinationStatistics.getDequeues().increment(); + destinationStatistics.getMessages().decrement(); - if(isAdvancedMessageStatisticsEnabled()) { - getDestinationStatistics().getDequeuedMessageBrokerInTime().setValue(reference.getMessage().getBrokerInTime()); - getDestinationStatistics().getDequeuedMessageBrokerOutTime().setValue(reference.getMessage().getBrokerOutTime()); - getDestinationStatistics().getDequeuedMessageClientID().setValue(context.getClientId()); - getDestinationStatistics().getDequeuedMessageID().setValue(reference.getMessageId().toString()); - getDestinationStatistics().getDequeuedMessageTimestamp().setValue(reference.getMessage().getTimestamp()); + MessageFlowStats tmpMessageFlowStats = destinationStatistics.getMessageFlowStats(); + if(isAdvancedMessageStatisticsEnabled() && tmpMessageFlowStats != null) { + Message tmpMessage = reference.getMessage(); + tmpMessageFlowStats.dequeueStats(context.getClientId(), tmpMessage.getMessageId().toString(), tmpMessage.getTimestamp(), tmpMessage.getBrokerInTime(), tmpMessage.getBrokerOutTime()); } if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) { - getDestinationStatistics().getNetworkDequeues().increment(); + destinationStatistics.getNetworkDequeues().increment(); } pagedInMessagesLock.writeLock().lock(); @@ -1979,15 +1978,15 @@ private final boolean tryCursorAdd(final Message msg) throws Exception { final void messageSent(final ConnectionContext context, final Message msg) throws Exception { pendingSends.decrementAndGet(); + destinationStatistics.getEnqueues().increment(); destinationStatistics.getMessages().increment(); destinationStatistics.getMessageSize().addSize(msg.getSize()); - if(isAdvancedMessageStatisticsEnabled()) { - destinationStatistics.getEnqueuedMessageBrokerInTime().setValue(msg.getBrokerInTime()); - destinationStatistics.getEnqueuedMessageClientID().setValue(context.getClientId()); - destinationStatistics.getEnqueuedMessageID().setValue(msg.getMessageId().toString()); - destinationStatistics.getEnqueuedMessageTimestamp().setValue(msg.getTimestamp()); + MessageFlowStats tmpMessageFlowStats = destinationStatistics.getMessageFlowStats(); + + if(isAdvancedMessageStatisticsEnabled() && tmpMessageFlowStats != null) { + tmpMessageFlowStats.enqueueStats(context.getClientId(), msg.getMessageId().toString(), msg.getTimestamp(), msg.getBrokerInTime()); } if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index a7773463dc..3a49b61df6 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -51,6 +51,7 @@ import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; +import org.apache.activemq.management.MessageFlowStats; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.NoLocalSubscriptionAware; import org.apache.activemq.store.PersistenceAdapter; @@ -779,11 +780,9 @@ protected void dispatch(final ConnectionContext context, Message message) throws // destinationStatistics.getMessages().increment(); destinationStatistics.getEnqueues().increment(); - if(isAdvancedMessageStatisticsEnabled()) { - destinationStatistics.getEnqueuedMessageBrokerInTime().setValue(message.getBrokerInTime()); - destinationStatistics.getEnqueuedMessageClientID().setValue(context.getClientId()); - destinationStatistics.getEnqueuedMessageID().setValue(message.getMessageId().toString()); - destinationStatistics.getEnqueuedMessageTimestamp().setValue(message.getTimestamp()); + MessageFlowStats tmpMessageFlowStats = destinationStatistics.getMessageFlowStats(); + if(isAdvancedMessageStatisticsEnabled() && tmpMessageFlowStats != null) { + tmpMessageFlowStats.enqueueStats(context.getClientId(), message.getMessageId().toString(), message.getTimestamp(), message.getBrokerInTime()); } if(isAdvancedNetworkStatisticsEnabled() && context != null && context.isNetworkConnection()) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 6b690bb598..c03b8f60d4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -25,6 +25,7 @@ import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; import org.apache.activemq.command.*; +import org.apache.activemq.management.MessageFlowStats; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transport.TransmitCallback; @@ -453,9 +454,10 @@ private void incrementStatsOnAck(final Destination destination, final MessageAck destination.getDestinationStatistics().getNetworkDequeues().add(count); } } - if(destination.isAdvancedMessageStatisticsEnabled()) { - destination.getDestinationStatistics().getDequeuedMessageClientID().setValue(context.getClientId()); - destination.getDestinationStatistics().getDequeuedMessageID().setValue(ack.getLastMessageId().toString()); + + MessageFlowStats tmpMessageFlowStats = destination.getDestinationStatistics().getMessageFlowStats(); + if(destination.isAdvancedMessageStatisticsEnabled() && tmpMessageFlowStats != null) { + tmpMessageFlowStats.dequeueStats(context.getClientId(), ack.getLastMessageId().toString()); } if (ack.isExpiredAck()) { destination.getDestinationStatistics().getExpired().add(count); diff --git a/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStats.java b/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStats.java new file mode 100644 index 0000000000..bbb742b4ec --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStats.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.management; + +public interface MessageFlowStats { + UnsampledStatistic getEnqueuedMessageBrokerInTime(); + UnsampledStatistic getEnqueuedMessageClientID(); + UnsampledStatistic getEnqueuedMessageID(); + UnsampledStatistic getEnqueuedMessageTimestamp(); + UnsampledStatistic getDequeuedMessageBrokerInTime(); + UnsampledStatistic getDequeuedMessageBrokerOutTime(); + UnsampledStatistic getDequeuedMessageClientID(); + UnsampledStatistic getDequeuedMessageID(); + UnsampledStatistic getDequeuedMessageTimestamp(); + void enqueueStats(String clientID, String messageID, long messageTimestamp, long messageBrokerInTime); + void dequeueStats(String clientID, String messageID); + void dequeueStats(String clientID, String messageID, long messageTimestamp, long messageBrokerInTime, long messageBrokerOutTime); +} diff --git a/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStatsImpl.java b/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStatsImpl.java new file mode 100644 index 0000000000..3833f2bd79 --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStatsImpl.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.management; + +import java.util.Set; + +public class MessageFlowStatsImpl extends UnsampledStatsImpl implements MessageFlowStats, Statistic, Resettable { + + private final UnsampledStatisticImpl enqueuedMessageBrokerInTime; + private final UnsampledStatisticImpl enqueuedMessageClientID; + private final UnsampledStatisticImpl enqueuedMessageID; + private final UnsampledStatisticImpl enqueuedMessageTimestamp; + private final UnsampledStatisticImpl dequeuedMessageBrokerInTime; + private final UnsampledStatisticImpl dequeuedMessageBrokerOutTime; + private final UnsampledStatisticImpl dequeuedMessageClientID; + private final UnsampledStatisticImpl dequeuedMessageID; + private final UnsampledStatisticImpl dequeuedMessageTimestamp; + + public MessageFlowStatsImpl() { + super(); + + enqueuedMessageBrokerInTime = new UnsampledStatisticImpl<>("enqueuedMessageBrokerInTime", "ms", "Broker in time (ms) of last enqueued message to the destination", Long.valueOf(0l)); + enqueuedMessageClientID = new UnsampledStatisticImpl<>("enqueuedMessageClientID", "id", "ClientID of last enqueued message to the destination", null); + enqueuedMessageID = new UnsampledStatisticImpl<>("enqueuedMessageID", "id", "MessageID of last enqueued message to the destination", null); + enqueuedMessageTimestamp = new UnsampledStatisticImpl<>("enqueuedMessageTimestamp", "ms", "Message timestamp of last enqueued message to the destination", Long.valueOf(0l)); + + dequeuedMessageBrokerInTime = new UnsampledStatisticImpl<>("dequeuedMessageBrokerInTime", "ms", "Broker in time (ms) of last dequeued message to the destination", Long.valueOf(0l)); + dequeuedMessageBrokerOutTime = new UnsampledStatisticImpl<>("dequeuedMessageBrokerOutTime", "ms", "Broker out time (ms) of last dequeued message to the destination", Long.valueOf(0l)); + dequeuedMessageClientID = new UnsampledStatisticImpl<>("dequeuedMessageClientID", "id", "ClientID of last dequeued message to the destination", null); + dequeuedMessageID = new UnsampledStatisticImpl<>("dequeuedMessageID", "id", "MessageID of last dequeued message to the destination", null); + dequeuedMessageTimestamp = new UnsampledStatisticImpl<>("dequeuedMessageTimestamp", "ms", "Message timestamp of last dequeued message to the destination", Long.valueOf(0l)); + + addStatistics(Set.of(enqueuedMessageBrokerInTime, enqueuedMessageClientID, enqueuedMessageID, enqueuedMessageTimestamp, + dequeuedMessageBrokerInTime, dequeuedMessageBrokerOutTime, dequeuedMessageClientID, dequeuedMessageID, dequeuedMessageTimestamp)); + } + + @Override + public UnsampledStatistic getEnqueuedMessageBrokerInTime() { + return enqueuedMessageBrokerInTime; + } + + @Override + public UnsampledStatistic getEnqueuedMessageClientID() { + return enqueuedMessageClientID; + } + + @Override + public UnsampledStatistic getEnqueuedMessageID() { + return enqueuedMessageID; + } + + @Override + public UnsampledStatistic getEnqueuedMessageTimestamp() { + return enqueuedMessageTimestamp; + } + + @Override + public UnsampledStatistic getDequeuedMessageBrokerInTime() { + return dequeuedMessageBrokerInTime; + } + + @Override + public UnsampledStatistic getDequeuedMessageBrokerOutTime() { + return dequeuedMessageBrokerOutTime; + } + + @Override + public UnsampledStatistic getDequeuedMessageClientID() { + return dequeuedMessageClientID; + } + + @Override + public UnsampledStatistic getDequeuedMessageID() { + return dequeuedMessageID; + } + + @Override + public UnsampledStatistic getDequeuedMessageTimestamp() { + return dequeuedMessageTimestamp; + } + + @Override + public synchronized void enqueueStats(String clientID, String messageID, long messageTimestamp, long messageBrokerInTime) { + enqueuedMessageClientID.setValue(clientID); + enqueuedMessageID.setValue(messageID); + enqueuedMessageTimestamp.setValue(messageTimestamp); + enqueuedMessageBrokerInTime.setValue(messageBrokerInTime); + } + + @Override + public synchronized void dequeueStats(String clientID, String messageID) { + dequeuedMessageClientID.setValue(clientID); + dequeuedMessageID.setValue(messageID); + } + + @Override + public synchronized void dequeueStats(String clientID, String messageID, long messageTimestamp, long messageBrokerInTime, long messageBrokerOutTime) { + dequeuedMessageClientID.setValue(clientID); + dequeuedMessageID.setValue(messageID); + dequeuedMessageTimestamp.setValue(messageTimestamp); + dequeuedMessageBrokerInTime.setValue(messageBrokerInTime); + dequeuedMessageBrokerOutTime.setValue(messageBrokerOutTime); + } +} diff --git a/activemq-client/src/main/java/org/apache/activemq/management/StatsImpl.java b/activemq-client/src/main/java/org/apache/activemq/management/StatsImpl.java index 0fda33f115..40fdeca14f 100644 --- a/activemq-client/src/main/java/org/apache/activemq/management/StatsImpl.java +++ b/activemq-client/src/main/java/org/apache/activemq/management/StatsImpl.java @@ -28,7 +28,7 @@ */ public class StatsImpl extends StatisticImpl implements Stats, Resettable { //use a Set instead of a Map - to conserve Space - private Set set; + protected Set set; public StatsImpl() { this(new CopyOnWriteArraySet()); diff --git a/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatisticImpl.java b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatisticImpl.java index 6f1a7864d9..ddd8998cdd 100644 --- a/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatisticImpl.java +++ b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatisticImpl.java @@ -31,7 +31,7 @@ public UnsampledStatisticImpl(String name, String unit, String description, T de } @Override - public synchronized void reset() { + public void reset() { if (isDoReset()) { value = defaultValue; } @@ -51,13 +51,13 @@ public long getLastSampleTime() { } @Override - public synchronized T getValue() { + public T getValue() { return value; } @Override - public synchronized void setValue(T value) { - if (isEnabled()) { + public void setValue(T value) { + if (this.enabled) { this.value = value; } } diff --git a/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatsImpl.java b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatsImpl.java new file mode 100644 index 0000000000..10d5facec2 --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatsImpl.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.management; + +public class UnsampledStatsImpl extends StatsImpl { + + public UnsampledStatsImpl() { + super(); + } + + @Override + protected void updateSampleTime() {} + + @Override + public long getStartTime() { + return 0; + } + @Override + public long getLastSampleTime() { + return 0; + } + + @Override + public synchronized void setEnabled(boolean enabled) { + set.stream().forEach(stat -> stat.setEnabled(enabled)); + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java index a7cd388ae3..e18a66c3b0 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java @@ -39,6 +39,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.management.MessageFlowStats; import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait.Condition; import org.junit.Test; @@ -175,28 +176,32 @@ public boolean isSatisified() throws Exception { assertEquals(0, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkDequeues().getCount()); // Advanced Message status - enqueue - assertEquals(lastIncludedSentMessageID, localBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueuedMessageID().getValue()); - assertNotNull(localBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueuedMessageBrokerInTime().getValue()); - assertTrue(localBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueuedMessageBrokerInTime().getValue() > 0l); - assertNotNull(localBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueuedMessageTimestamp().getValue()); - assertTrue(localBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueuedMessageTimestamp().getValue() > 0l); - assertEquals("localClientId", localBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueuedMessageClientID().getValue()); + MessageFlowStats localBrokerIncludedMessageFlowStats = localBroker.getDestination(includedDestination).getDestinationStatistics().getMessageFlowStats(); + MessageFlowStats localBrokerExcludedMessageFlowStats = localBroker.getDestination(excludedDestination).getDestinationStatistics().getMessageFlowStats(); + MessageFlowStats remoteBrokerExcludedMessageFlowStats = remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getMessageFlowStats(); + + assertEquals(lastIncludedSentMessageID, localBrokerIncludedMessageFlowStats.getEnqueuedMessageID().getValue()); + assertNotNull(localBrokerIncludedMessageFlowStats.getEnqueuedMessageBrokerInTime().getValue()); + assertTrue(localBrokerIncludedMessageFlowStats.getEnqueuedMessageBrokerInTime().getValue() > 0l); + assertNotNull(localBrokerIncludedMessageFlowStats.getEnqueuedMessageTimestamp().getValue()); + assertTrue(localBrokerIncludedMessageFlowStats.getEnqueuedMessageTimestamp().getValue() > 0l); + assertEquals("localClientId", localBrokerIncludedMessageFlowStats.getEnqueuedMessageClientID().getValue()); // Advanced Message status - dequeue - assertEquals(lastIncludedSentMessageID, localBroker.getDestination(includedDestination).getDestinationStatistics().getDequeuedMessageID().getValue()); - assertNotNull(localBroker.getDestination(includedDestination).getDestinationStatistics().getDequeuedMessageBrokerInTime().getValue()); - assertNotNull(localBroker.getDestination(includedDestination).getDestinationStatistics().getDequeuedMessageBrokerOutTime().getValue()); - assertTrue(localBroker.getDestination(includedDestination).getDestinationStatistics().getDequeuedMessageClientID().getValue().startsWith("networkConnector")); - assertNotNull(localBroker.getDestination(includedDestination).getDestinationStatistics().getDequeuedMessageTimestamp().getValue()); + assertEquals(lastIncludedSentMessageID, localBrokerIncludedMessageFlowStats.getDequeuedMessageID().getValue()); + assertNotNull(localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerInTime().getValue()); + assertNotNull(localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerOutTime().getValue()); + assertTrue(localBrokerIncludedMessageFlowStats.getDequeuedMessageClientID().getValue().startsWith("networkConnector")); + assertNotNull(localBrokerIncludedMessageFlowStats.getDequeuedMessageTimestamp().getValue()); if(includedDestination.isTopic() && !durable) { - assertEquals(Long.valueOf(0l), localBroker.getDestination(includedDestination).getDestinationStatistics().getDequeuedMessageBrokerInTime().getValue()); - assertEquals(Long.valueOf(0l), localBroker.getDestination(includedDestination).getDestinationStatistics().getDequeuedMessageBrokerOutTime().getValue()); - assertEquals(Long.valueOf(0l), localBroker.getDestination(includedDestination).getDestinationStatistics().getDequeuedMessageTimestamp().getValue()); + assertEquals(Long.valueOf(0l), localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerInTime().getValue()); + assertEquals(Long.valueOf(0l), localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerOutTime().getValue()); + assertEquals(Long.valueOf(0l), localBrokerIncludedMessageFlowStats.getDequeuedMessageTimestamp().getValue()); } else { - assertTrue(localBroker.getDestination(includedDestination).getDestinationStatistics().getDequeuedMessageBrokerInTime().getValue() > 0l); - assertTrue(localBroker.getDestination(includedDestination).getDestinationStatistics().getDequeuedMessageBrokerOutTime().getValue() > 0l); - assertTrue(localBroker.getDestination(includedDestination).getDestinationStatistics().getDequeuedMessageTimestamp().getValue() > 0l); + assertTrue(localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerInTime().getValue() > 0l); + assertTrue(localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerOutTime().getValue() > 0l); + assertTrue(localBrokerIncludedMessageFlowStats.getDequeuedMessageTimestamp().getValue() > 0l); } // Make sure stats do not increment for local-only excluded destinations @@ -209,21 +214,21 @@ public boolean isSatisified() throws Exception { assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getForwards().getCount()); assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkEnqueues().getCount()); assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkDequeues().getCount()); - assertEquals(lastExcludedSentMessageID, localBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueuedMessageID().getValue()); - assertNull(localBroker.getDestination(excludedDestination).getDestinationStatistics().getDequeuedMessageID().getValue()); + assertEquals(lastExcludedSentMessageID, localBrokerExcludedMessageFlowStats.getEnqueuedMessageID().getValue()); + assertNull(localBrokerExcludedMessageFlowStats.getDequeuedMessageID().getValue()); // Advanced Message status - enqueue - assertNull(remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueuedMessageID().getValue()); - assertEquals(Long.valueOf(0l), remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueuedMessageBrokerInTime().getValue()); - assertEquals(Long.valueOf(0l), remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueuedMessageTimestamp().getValue()); - assertNull(remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueuedMessageClientID().getValue()); + assertNull(remoteBrokerExcludedMessageFlowStats.getEnqueuedMessageID().getValue()); + assertEquals(Long.valueOf(0l), remoteBrokerExcludedMessageFlowStats.getEnqueuedMessageBrokerInTime().getValue()); + assertEquals(Long.valueOf(0l), remoteBrokerExcludedMessageFlowStats.getEnqueuedMessageTimestamp().getValue()); + assertNull(remoteBrokerExcludedMessageFlowStats.getEnqueuedMessageClientID().getValue()); // Advanced Message status - dequeue - assertNull(lastIncludedSentMessageID, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getDequeuedMessageID().getValue()); - assertEquals(Long.valueOf(0l), remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getDequeuedMessageBrokerInTime().getValue()); - assertEquals(Long.valueOf(0l), remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getDequeuedMessageBrokerOutTime().getValue()); - assertEquals(Long.valueOf(0l), remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getDequeuedMessageTimestamp().getValue()); - assertNull(remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getDequeuedMessageClientID().getValue()); + assertNull(lastIncludedSentMessageID, remoteBrokerExcludedMessageFlowStats.getDequeuedMessageID().getValue()); + assertEquals(Long.valueOf(0l), remoteBrokerExcludedMessageFlowStats.getDequeuedMessageBrokerInTime().getValue()); + assertEquals(Long.valueOf(0l), remoteBrokerExcludedMessageFlowStats.getDequeuedMessageBrokerOutTime().getValue()); + assertEquals(Long.valueOf(0l), remoteBrokerExcludedMessageFlowStats.getDequeuedMessageTimestamp().getValue()); + assertNull(remoteBrokerExcludedMessageFlowStats.getDequeuedMessageClientID().getValue()); if(includedDestination.isTopic()) { assertTrue(Wait.waitFor(new Condition() {