diff --git a/src/groups/mqb/mqba/mqba_clientsession.cpp b/src/groups/mqb/mqba/mqba_clientsession.cpp index 636eec3168..cc8988a65e 100644 --- a/src/groups/mqb/mqba/mqba_clientsession.cpp +++ b/src/groups/mqb/mqba/mqba_clientsession.cpp @@ -584,9 +584,11 @@ void ClientSession::sendAck(bmqt::AckResult::Enum status, // If queue is found, report locally generated NACK if (isSelfGenerated) { - queueState->d_handle_p->queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_NACK, - 1); + queueState->d_handle_p->queue() + ->stats() + ->onEvent( + + 1); } } @@ -1658,9 +1660,9 @@ void ClientSession::onAckEvent(const mqbi::DispatcherAckEvent& event) // Calculate time delta between PUT and ACK const bsls::Types::Int64 timeDelta = bmqsys::Time::highResolutionTimer() - cit->second.d_timeStamp; - queue->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_ACK_TIME, - timeDelta); + queue->stats() + ->onEvent( + timeDelta); if (!d_isClientGeneratingGUIDs) { // Legacy client. @@ -2693,8 +2695,8 @@ ClientSession::ClientSession( this, bdlf::PlaceHolders::_1)); // type - mqbstat::BrokerStats::instance().onEvent( - mqbstat::BrokerStats::EventType::e_CLIENT_CREATED); + mqbstat::BrokerStats::instance() + .onEvent(); BALL_LOG_INFO << description() << ": created " << "[dispatcherProcessor: " << processor @@ -2714,8 +2716,8 @@ ClientSession::~ClientSession() BALL_LOG_INFO << description() << ": destructor"; - mqbstat::BrokerStats::instance().onEvent( - mqbstat::BrokerStats::EventType::e_CLIENT_DESTROYED); + mqbstat::BrokerStats::instance() + .onEvent(); // Unregister from the dispatcher dispatcher()->unregisterClient(this); diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index 059227215c..a8b82702d5 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -418,9 +418,9 @@ void Cluster::sendAck(bmqt::AckResult::Enum status, // If queue exists, report self generated NACK if (isSelfGenerated) { - it->second.d_handle_p->queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_NACK, - 1); + it->second.d_handle_p->queue() + ->stats() + ->onEvent(1); } } else if (!isSelfGenerated) { @@ -467,9 +467,9 @@ void Cluster::sendAck(bmqt::AckResult::Enum status, cit->second.d_subQueueInfosMap.findBySubIdSafe( bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID); if (subQueueCiter != cit->second.d_subQueueInfosMap.end()) { - subQueueCiter->value().d_clientStats->onEvent( - mqbstat::ClusterNodeStats::EventType::e_ACK, - 1); + subQueueCiter->value() + .d_clientStats + ->onEvent(1); } // In the case of Strong Consistency, a Receipt can arrive and trigger // an ACK after Producer closes subStream. @@ -548,7 +548,7 @@ void Cluster::generateNack(bmqt::AckResult::Enum status, options); // Report locally generated NACK - queue->stats()->onEvent(mqbstat::QueueStatsDomain::EventType::e_NACK, 1); + queue->stats()->onEvent(1); bmqu::MemOutStream os; os << description() << ": Failed to relay PUT message " @@ -1206,9 +1206,11 @@ void Cluster::onPutEvent(const mqbi::DispatcherPutEvent& event) queueState.d_subQueueInfosMap.findBySubId( bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID); - subQueueCiter->value().d_clientStats->onEvent( - mqbstat::ClusterNodeStats::EventType::e_PUT, - appDataSp->length()); + subQueueCiter->value() + .d_clientStats + ->onEvent( + + appDataSp->length()); // TBD: groupId: Similar to 'appDataSp' above, load 'optionsSp' here, // using something like PutMessageIterator::loadOptions(). @@ -1644,9 +1646,9 @@ Cluster::validateMessage(mqbi::QueueHandle** queueHandle, if (eventType == bmqp::EventType::e_CONFIRM) { // Update client stats - subQueueIt->value().d_clientStats->onEvent( - mqbstat::ClusterNodeStats::EventType::e_CONFIRM, - 1); + subQueueIt->value() + .d_clientStats + ->onEvent(1); } return ValidationResult::k_SUCCESS; @@ -1912,9 +1914,10 @@ void Cluster::onPushEvent(const mqbi::DispatcherPushEvent& event) queueState.d_subQueueInfosMap.findBySubscriptionId( event.subQueueInfos()[i].id()); - subQueueCiter->value().d_clientStats->onEvent( - mqbstat::ClusterNodeStats::EventType::e_PUSH, - event.blob() ? event.blob()->length() : 0); + subQueueCiter->value() + .d_clientStats + ->onEvent( + event.blob() ? event.blob()->length() : 0); } bmqt::GenericResult::Enum rc = bmqt::GenericResult::e_SUCCESS; diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index 3fd90fb12b..d2d5583ef2 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -430,10 +430,10 @@ int Domain::configure(bsl::ostream& errorDescription, d_capacityMeter.setLimits(limits.messages(), limits.bytes()) .setWatermarkThresholds(limits.messagesWatermarkRatio(), limits.bytesWatermarkRatio()); - d_domainsStats.onEvent(mqbstat::DomainStats::EventType::e_CFG_MSGS, - limits.messages()); - d_domainsStats.onEvent(mqbstat::DomainStats::EventType::e_CFG_BYTES, - limits.bytes()); + d_domainsStats.onEvent( + limits.messages()); + d_domainsStats.onEvent( + limits.bytes()); if (isReconfigure) { BSLS_ASSERT_OPT(oldConfig.has_value()); diff --git a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp index 208ad42053..d10ecaafeb 100644 --- a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp @@ -173,17 +173,17 @@ int LocalQueue::configure(bsl::ostream& errorDescription, bool isReconfigure) d_state_p->uri(), d_state_p->partitionId()); - d_state_p->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE, - mqbstat::QueueStatsDomain::Role::e_PRIMARY); + d_state_p->stats() + ->onEvent( + mqbstat::QueueStatsDomain::Role::e_PRIMARY); - d_state_p->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_CFG_MSGS, - domainCfg.storage().queueLimits().messages()); + d_state_p->stats() + ->onEvent( + domainCfg.storage().queueLimits().messages()); - d_state_p->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_CFG_BYTES, - domainCfg.storage().queueLimits().bytes()); + d_state_p->stats() + ->onEvent( + domainCfg.storage().queueLimits().bytes()); if (isReconfigure) { if (domainCfg.mode().isFanoutValue()) { @@ -482,9 +482,9 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader, // Calculate time delta between PUT and ACK const bsls::Types::Int64 timeDelta = bmqsys::Time::highResolutionTimer() - timePoint; - d_state_p->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_ACK_TIME, - timeDelta); + d_state_p->stats() + ->onEvent( + timeDelta); if (res != mqbi::StorageResult::e_SUCCESS || doAck) { bmqp::AckMessage ackMessage; ackMessage @@ -509,9 +509,9 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader, // flushed (which occurs in 'flush' routine). In no case should // 'afterNewMessage' be called here. - d_state_p->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_PUT, - appData->length()); + d_state_p->stats() + ->onEvent( + appData->length()); } else { BSLS_PERFORMANCEHINT_UNLIKELY_HINT; @@ -525,9 +525,8 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader, } } else { - d_state_p->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_NACK, - 1); + d_state_p->stats() + ->onEvent(1); } } } @@ -548,9 +547,8 @@ void LocalQueue::onReceipt(const bmqt::MessageGUID& msgGUID, const bsls::Types::Int64 timeDelta = bmqsys::Time::highResolutionTimer() - arrivalTimepoint; - d_state_p->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_ACK_TIME, - timeDelta); + d_state_p->stats() + ->onEvent(timeDelta); if (d_state_p->handleCatalog().hasHandle(qH)) { // Send acknowledgement @@ -572,8 +570,8 @@ void LocalQueue::onRemoval(const bmqt::MessageGUID& msgGUID, // TODO: do we need to update NACK stats considering that downstream can // NACK the same GUID as well? - d_state_p->stats()->onEvent(mqbstat::QueueStatsDomain::EventType::e_NACK, - 1); + d_state_p->stats()->onEvent( + 1); if (d_state_p->handleCatalog().hasHandle(qH)) { // Send negative acknowledgement diff --git a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp index c1bd5fc1ec..69c6412b47 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp @@ -1452,9 +1452,9 @@ void QueueEngineUtil_AppState::reportStats( message->attributes()); // First report 'queue time' metric for the entire queue - d_queue_p->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_QUEUE_TIME, - timeDelta); + d_queue_p->stats() + ->onEvent( + timeDelta); // Then report 'queue time' metric for appId d_queue_p->stats()->onEvent( diff --git a/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp b/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp index 42ffca0c72..f3a240ad9b 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp @@ -299,9 +299,9 @@ QueueHandle::updateMonitor(const bsl::shared_ptr& subStream, // The message exist in the storage, we likely are in situation // (3), so it's a legit confirm and therefore, update the domain // stats. - d_domainStats_p->onEvent( - mqbstat::QueueStatsDomain::EventType::e_CONFIRM, - msgSize); + d_domainStats_p + ->onEvent( + msgSize); } BALL_LOG_INFO_BLOCK @@ -404,15 +404,15 @@ mqbu::ResourceUsageMonitorStateTransition::Enum QueueHandle::updateMonitor( if (type == bmqp::EventType::e_CONFIRM) { // Update domain stats - d_domainStats_p->onEvent( - mqbstat::QueueStatsDomain::EventType::e_CONFIRM, - msgSize); + d_domainStats_p + ->onEvent( + msgSize); // Report CONFIRM time only at first hop // Note that we update metric per entire queue and also per `appId` if (d_clientContext_sp->isFirstHop()) { - d_domainStats_p->onEvent( - mqbstat::QueueStatsDomain::EventType::e_CONFIRM_TIME, + d_domainStats_p->onEvent< + mqbstat::QueueStatsDomain::EventType::e_CONFIRM_TIME>( timeDelta); d_domainStats_p->onEvent( mqbstat::QueueStatsDomain::EventType::e_CONFIRM_TIME, @@ -498,8 +498,8 @@ void QueueHandle::deliverMessageImpl( BSLS_ASSERT_SAFE(subQueueInfos.size() >= 1 && subQueueInfos.size() <= d_subscriptions.size()); - d_domainStats_p->onEvent(mqbstat::QueueStatsDomain::EventType::e_PUSH, - msgSize); + d_domainStats_p->onEvent( + msgSize); // Create an event to dispatch delivery of the message to the client mqbi::DispatcherClient* client = d_clientContext_sp->client(); @@ -1208,7 +1208,7 @@ void QueueHandle::onAckMessage(const bmqp::AckMessage& ackMessage) ackMsg.setQueueId(id()); client->dispatcher()->dispatchEvent(event, client); - d_domainStats_p->onEvent(mqbstat::QueueStatsDomain::EventType::e_ACK, 1); + d_domainStats_p->onEvent(1); } bool QueueHandle::canDeliver(unsigned int downstreamSubscriptionId) const diff --git a/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp b/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp index 6070ba0672..86ec95caa2 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp @@ -88,14 +88,14 @@ QueueState::QueueState(mqbi::Queue* queue, // NOTE: The 'description' will be set by the owner of this object. - mqbstat::BrokerStats::instance().onEvent( - mqbstat::BrokerStats::EventType::e_QUEUE_CREATED); + mqbstat::BrokerStats::instance() + .onEvent(); } QueueState::~QueueState() { - mqbstat::BrokerStats::instance().onEvent( - mqbstat::BrokerStats::EventType::e_QUEUE_DESTROYED); + mqbstat::BrokerStats::instance() + .onEvent(); } void QueueState::add(const bmqp_ctrlmsg::QueueHandleParameters& params) diff --git a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp index 9c8057c5e3..06795b7be9 100644 --- a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp @@ -158,9 +158,9 @@ int RemoteQueue::configureAsProxy(bsl::ostream& errorDescription, return 10 * rc + rc_QUEUE_ENGINE_CFG_FAILURE; // RETURN } - d_state_p->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE, - mqbstat::QueueStatsDomain::Role::e_PROXY); + d_state_p->stats() + ->onEvent( + mqbstat::QueueStatsDomain::Role::e_PROXY); BALL_LOG_INFO << "Created a ProxyRemoteQueue " @@ -276,9 +276,9 @@ int RemoteQueue::configureAsClusterMember(bsl::ostream& errorDescription, d_state_p->storageManager()->setQueueRaw(queue, d_state_p->uri(), d_state_p->partitionId()); - d_state_p->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE, - mqbstat::QueueStatsDomain::Role::e_REPLICA); + d_state_p->stats() + ->onEvent( + mqbstat::QueueStatsDomain::Role::e_REPLICA); BALL_LOG_INFO << d_state_p->domain()->cluster()->name() << ": Created a ClusterMemberRemoteQueue " @@ -922,9 +922,8 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn, bmqt::AckResult::e_REFUSED)); ackMessage.setMessageGUID(putHeader.messageGUID()); - d_state_p->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_NACK, - 1); + d_state_p->stats() + ->onEvent(1); // CorrelationId & QueueId are left unset as those fields // will be filled downstream. @@ -996,8 +995,9 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn, // the time the message is actually sent upstream, i.e. in // cluster/clusterProxy) for the most exact accuracy, but doing it here is // good enough. - d_state_p->stats()->onEvent(mqbstat::QueueStatsDomain::EventType::e_PUT, - appData->length()); + + d_state_p->stats()->onEvent( + appData->length()); } void RemoteQueue::confirmMessage(const bmqt::MessageGUID& msgGUID, @@ -1496,8 +1496,8 @@ RemoteQueue::Puts::iterator& RemoteQueue::nack(Puts::iterator& it, { ackMessage.setMessageGUID(it->first); - d_state_p->stats()->onEvent(mqbstat::QueueStatsDomain::EventType::e_NACK, - 1); + d_state_p->stats()->onEvent( + 1); // CorrelationId & QueueId are left unset as those fields // will be filled downstream. diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp index 68435607bf..06114746dd 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp @@ -1301,9 +1301,10 @@ void RootQueueEngine::afterNewMessage( } if (!d_appsDeliveryContext.isEmpty()) { // Report 'queue time' metric for the entire queue - d_queueState_p->queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_QUEUE_TIME, - d_appsDeliveryContext.timeDelta()); + d_queueState_p->queue() + ->stats() + ->onEvent( + d_appsDeliveryContext.timeDelta()); } d_appsDeliveryContext.deliverMessage(); } @@ -1518,9 +1519,9 @@ int RootQueueEngine::onRejectMessage(mqbi::QueueHandle* handle, d_queueState_p, d_allocator_p)); } - d_queueState_p->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_REJECT, - 1); + d_queueState_p->stats() + ->onEvent( + 1); // Lastly, if message reached a ref count of zero in the // storage (i.e., all appIds have confirmed the message), diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp index 3d47083e03..2d2ca30464 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp @@ -552,9 +552,10 @@ void ClusterState::DomainState::adjustQueueCount(int by) d_numAssignedQueues += by; if (d_domain_p != 0) { - d_domain_p->domainStats()->onEvent( - mqbstat::DomainStats::EventType::e_QUEUE_COUNT, - d_numAssignedQueues); + d_domain_p->domainStats() + ->onEvent( + + d_numAssignedQueues); } } diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp b/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp index 1dff4b7626..abe445cfab 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp @@ -89,11 +89,11 @@ void FileBackedStorage::purgeCommon(const mqbu::StorageKey& appKey) // Update stats d_capacityMeter.clear(); - d_queueStats_sp->onEvent(mqbstat::QueueStatsDomain::EventType::e_PURGE, - 0); - d_queueStats_sp->onEvent( - mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, - d_handles.historySize()); + d_queueStats_sp + ->onEvent(0); + d_queueStats_sp + ->onEvent( + d_handles.historySize()); } } @@ -359,9 +359,11 @@ FileBackedStorage::put(mqbi::StorageMessageAttributes* attributes, d_currentlyAutoConfirming = bmqt::MessageGUID(); BSLS_ASSERT_SAFE(queue()); - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE, - msgSize); + queue() + ->stats() + ->onEvent( + + msgSize); d_isEmpty.storeRelaxed(0); @@ -479,9 +481,9 @@ FileBackedStorage::releaseRef(const bmqt::MessageGUID& guid) if (queue()) { queue()->queueEngine()->beforeMessageRemoved(guid); } - d_queueStats_sp->onEvent( - mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE, - msgLen); + d_queueStats_sp + ->onEvent( + msgLen); // There is not really a need to remove the guid from all virtual // storages, because we can be here only if guid doesn't exist in @@ -496,9 +498,9 @@ FileBackedStorage::releaseRef(const bmqt::MessageGUID& guid) d_capacityMeter.remove(1, msgLen); d_handles.erase(it); - d_queueStats_sp->onEvent( - mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, - d_handles.historySize()); + d_queueStats_sp + ->onEvent( + d_handles.historySize()); return mqbi::StorageResult::e_ZERO_REFERENCES; } @@ -544,12 +546,16 @@ FileBackedStorage::remove(const bmqt::MessageGUID& msgGUID, int* msgSize) d_capacityMeter.remove(1, msgLen); BSLS_ASSERT_SAFE(queue()); - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE, - msgLen); - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, - d_handles.historySize()); + queue() + ->stats() + ->onEvent( + + msgLen); + queue() + ->stats() + ->onEvent( + + d_handles.historySize()); if (msgSize) { *msgSize = msgLen; @@ -604,9 +610,9 @@ FileBackedStorage::removeAll(const mqbu::StorageKey& appKey) d_isEmpty.storeRelaxed(1); } - d_queueStats_sp->onEvent( - mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, - d_handles.historySize()); + d_queueStats_sp + ->onEvent( + d_handles.historySize()); return mqbi::StorageResult::e_SUCCESS; } @@ -694,9 +700,9 @@ int FileBackedStorage::gcExpiredMessages( if (queue()) { queue()->queueEngine()->beforeMessageRemoved(cit->first); } - d_queueStats_sp->onEvent( - mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE, - msgLen); + d_queueStats_sp + ->onEvent( + msgLen); // Remove message from all virtual storages. d_virtualStorageCatalog.gc(cit->first); @@ -715,18 +721,18 @@ int FileBackedStorage::gcExpiredMessages( if (numMsgsDeleted > 0) { if (numMsgsDeleted > numMsgsUnreceipted) { - d_queueStats_sp->onEvent( - mqbstat::QueueStatsDomain::EventType::e_GC_MESSAGE, - numMsgsDeleted - numMsgsUnreceipted); + d_queueStats_sp + ->onEvent( + numMsgsDeleted - numMsgsUnreceipted); } if (numMsgsUnreceipted) { - d_queueStats_sp->onEvent( - mqbstat::QueueStatsDomain::EventType::e_NO_SC_MESSAGE, + d_queueStats_sp->onEvent< + mqbstat::QueueStatsDomain::EventType::e_NO_SC_MESSAGE>( numMsgsUnreceipted); } - d_queueStats_sp->onEvent( - mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, - d_handles.historySize()); + d_queueStats_sp + ->onEvent( + d_handles.historySize()); } if (d_handles.empty()) { @@ -741,9 +747,9 @@ bool FileBackedStorage::gcHistory() bool hasMoreToGc = d_handles.gc(bmqsys::Time::highResolutionTimer(), k_GC_MESSAGES_BATCH_SIZE); - d_queueStats_sp->onEvent( - mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, - d_handles.historySize()); + d_queueStats_sp + ->onEvent( + d_handles.historySize()); return hasMoreToGc; } @@ -798,9 +804,9 @@ void FileBackedStorage::processMessageRecord( // Update the messages & bytes monitors, and the stats. d_capacityMeter.forceCommit(1, msgLen); // Return value ignored. - d_queueStats_sp->onEvent( - mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE, - msgLen); + d_queueStats_sp + ->onEvent( + msgLen); d_isEmpty.storeRelaxed(0); } @@ -914,9 +920,8 @@ void FileBackedStorage::processDeletionRecord(const bmqt::MessageGUID& guid) if (queue()) { queue()->queueEngine()->beforeMessageRemoved(guid); } - d_queueStats_sp->onEvent( - mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE, - msgLen); + d_queueStats_sp + ->onEvent(msgLen); // Delete 'guid' from all virtual storages, if any. Note that 'guid' // should have already been removed from each virtual storage when confirm @@ -942,9 +947,9 @@ void FileBackedStorage::processDeletionRecord(const bmqt::MessageGUID& guid) d_isEmpty.storeRelaxed(1); } - d_queueStats_sp->onEvent( - mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, - d_handles.historySize()); + d_queueStats_sp + ->onEvent( + d_handles.historySize()); } void FileBackedStorage::addQueueOpRecordHandle( @@ -1014,9 +1019,9 @@ FileBackedStorage::autoConfirm(const mqbu::StorageKey& appKey, void FileBackedStorage::setPrimary() { - d_queueStats_sp->onEvent( - mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE, - mqbstat::QueueStatsDomain::Role::e_PRIMARY); + d_queueStats_sp + ->onEvent( + mqbstat::QueueStatsDomain::Role::e_PRIMARY); } void FileBackedStorage::clearSelection() diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp index 37bd3450e7..191d0fb2be 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp @@ -220,9 +220,11 @@ InMemoryStorage::put(mqbi::StorageMessageAttributes* attributes, d_currentlyAutoConfirming = bmqt::MessageGUID(); if (queue()) { - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE, - msgSize); + queue() + ->stats() + ->onEvent( + + msgSize); } d_isEmpty.storeRelaxed(0); @@ -309,9 +311,10 @@ InMemoryStorage::releaseRef(const bmqt::MessageGUID& guid) d_capacityMeter.remove(1, msgLen); if (queue()) { queue()->queueEngine()->beforeMessageRemoved(guid); - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE, - msgLen); + queue() + ->stats() + ->onEvent( + msgLen); } // There is not really a need to remove the guid from all virtual @@ -324,9 +327,11 @@ InMemoryStorage::releaseRef(const bmqt::MessageGUID& guid) d_items.erase(it); if (queue()) { - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, - d_items.historySize()); + queue() + ->stats() + ->onEvent< + mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>( + d_items.historySize()); } return mqbi::StorageResult::e_ZERO_REFERENCES; // RETURN @@ -353,12 +358,14 @@ InMemoryStorage::remove(const bmqt::MessageGUID& msgGUID, int* msgSize) d_capacityMeter.remove(1, msgLen); if (queue()) { - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE, - msgLen); - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, - d_items.historySize()); + queue() + ->stats() + ->onEvent( + msgLen); + queue() + ->stats() + ->onEvent( + d_items.historySize()); } if (msgSize) { @@ -383,9 +390,9 @@ InMemoryStorage::removeAll(const mqbu::StorageKey& appKey) d_capacityMeter.clear(); if (queue()) { - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_PURGE, - 0); + queue() + ->stats() + ->onEvent(0); } d_isEmpty.storeRelaxed(1); @@ -413,9 +420,10 @@ InMemoryStorage::removeAll(const mqbu::StorageKey& appKey) } if (queue()) { - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, - d_items.historySize()); + queue() + ->stats() + ->onEvent( + d_items.historySize()); } return mqbi::StorageResult::e_SUCCESS; @@ -454,9 +462,10 @@ int InMemoryStorage::gcExpiredMessages( d_capacityMeter.remove(1, msgLen); if (queue()) { queue()->queueEngine()->beforeMessageRemoved(cit->first); - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE, - msgLen); + queue() + ->stats() + ->onEvent( + msgLen); } // Remove message from all virtual storages and the physical (this) @@ -467,12 +476,14 @@ int InMemoryStorage::gcExpiredMessages( } if (queue() && (numMsgsDeleted > 0)) { - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_GC_MESSAGE, - numMsgsDeleted); - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, - d_items.historySize()); + queue() + ->stats() + ->onEvent( + numMsgsDeleted); + queue() + ->stats() + ->onEvent( + d_items.historySize()); } if (d_items.empty()) { @@ -488,9 +499,10 @@ bool InMemoryStorage::gcHistory() k_GC_MESSAGES_BATCH_SIZE); if (queue()) { - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, - d_items.historySize()); + queue() + ->stats() + ->onEvent( + d_items.historySize()); } return hasMoreToGc; diff --git a/src/groups/mqb/mqbstat/mqbstat_brokerstats.cpp b/src/groups/mqb/mqbstat/mqbstat_brokerstats.cpp index ad1fbb04cc..ffc58d6721 100644 --- a/src/groups/mqb/mqbstat/mqbstat_brokerstats.cpp +++ b/src/groups/mqb/mqbstat/mqbstat_brokerstats.cpp @@ -40,16 +40,6 @@ namespace { /// Name of the stat context to create (holding all broker's statistics) static const char k_BROKER_STAT_NAME[] = "broker"; -//------------------------ -// struct BrokerStatsIndex -//------------------------ - -/// Namespace for the constants of stat values that applies to the queues -/// from the clients -struct BrokerStatsIndex { - enum Enum { e_STAT_QUEUE_COUNT, e_STAT_CLIENT_COUNT }; -}; - } // close unnamed namespace // ----------------- @@ -110,30 +100,6 @@ void BrokerStats::initialize(bmqst::StatContext* brokerStatContext) d_statContext_p = brokerStatContext; } -void BrokerStats::onEvent(EventType::Enum type) -{ - BSLS_ASSERT_SAFE(d_statContext_p && "initialize was not called"); - - switch (type) { - case EventType::e_CLIENT_CREATED: { - d_statContext_p->adjustValue(BrokerStatsIndex::e_STAT_CLIENT_COUNT, 1); - } break; - case EventType::e_CLIENT_DESTROYED: { - d_statContext_p->adjustValue(BrokerStatsIndex::e_STAT_CLIENT_COUNT, - -1); - } break; - case EventType::e_QUEUE_CREATED: { - d_statContext_p->adjustValue(BrokerStatsIndex::e_STAT_QUEUE_COUNT, 1); - } break; - case EventType::e_QUEUE_DESTROYED: { - d_statContext_p->adjustValue(BrokerStatsIndex::e_STAT_QUEUE_COUNT, -1); - } break; - default: { - BSLS_ASSERT_SAFE(false && "Unknown event type"); - } break; - }; -} - // --------------------- // class BrokerStatsUtil // --------------------- diff --git a/src/groups/mqb/mqbstat/mqbstat_brokerstats.h b/src/groups/mqb/mqbstat/mqbstat_brokerstats.h index c803f66bf9..005e7c0182 100644 --- a/src/groups/mqb/mqbstat/mqbstat_brokerstats.h +++ b/src/groups/mqb/mqbstat/mqbstat_brokerstats.h @@ -30,6 +30,7 @@ // MQB // BMQ +#include #include // BDE @@ -42,11 +43,6 @@ namespace BloombergLP { -// FORWARD DECLARATION -namespace bmqst { -class StatContext; -} - namespace mqbstat { // ================= @@ -84,6 +80,14 @@ class BrokerStats { // DATA bmqst::StatContext* d_statContext_p; // StatContext + // PRIVATE TYPES + + /// Namespace for the constants of stat values that applies to the queues + /// from the clients + struct BrokerStatsIndex { + enum Enum { e_STAT_QUEUE_COUNT, e_STAT_CLIENT_COUNT }; + }; + private: // NOT IMPLEMENTED BrokerStats(const BrokerStats&) BSLS_CPP11_DELETED; @@ -124,7 +128,8 @@ class BrokerStats { /// Update statistics for the event of the specified `type` and with the /// specified `value` (depending on the `type`, `value` can represent /// the number of bytes, a counter, ... - void onEvent(EventType::Enum type); + template + void onEvent(); /// Return a pointer to the statcontext. bmqst::StatContext* statContext(); @@ -159,6 +164,42 @@ inline bmqst::StatContext* BrokerStats::statContext() return d_statContext_p; } +template <> +inline void +BrokerStats::onEvent() +{ + BSLS_ASSERT_SAFE(d_statContext_p && "initialize was not called"); + + d_statContext_p->adjustValue(BrokerStatsIndex::e_STAT_CLIENT_COUNT, 1); +} + +template <> +inline void +BrokerStats::onEvent() +{ + BSLS_ASSERT_SAFE(d_statContext_p && "initialize was not called"); + + d_statContext_p->adjustValue(BrokerStatsIndex::e_STAT_CLIENT_COUNT, -1); +} + +template <> +inline void +BrokerStats::onEvent() +{ + BSLS_ASSERT_SAFE(d_statContext_p && "initialize was not called"); + + d_statContext_p->adjustValue(BrokerStatsIndex::e_STAT_QUEUE_COUNT, 1); +} + +template <> +inline void +BrokerStats::onEvent() +{ + BSLS_ASSERT_SAFE(d_statContext_p && "initialize was not called"); + + d_statContext_p->adjustValue(BrokerStatsIndex::e_STAT_QUEUE_COUNT, -1); +} + } // close package namespace } // close enterprise namespace diff --git a/src/groups/mqb/mqbstat/mqbstat_clusterstats.cpp b/src/groups/mqb/mqbstat/mqbstat_clusterstats.cpp index 3550d813ad..42d3c5c97c 100644 --- a/src/groups/mqb/mqbstat/mqbstat_clusterstats.cpp +++ b/src/groups/mqb/mqbstat/mqbstat_clusterstats.cpp @@ -85,35 +85,6 @@ struct ClusterStatsIndex { }; }; -// ---------------------------- -// struct ClusterNodeStatsIndex -// ---------------------------- - -/// Namespace for the constants of stat values that applies to the -/// cluster node -struct ClusterNodeStatsIndex { - enum Enum { - /// Value: Number of ack messages delivered to the client - e_STAT_ACK - - , - e_STAT_CONFIRM - // Value: Number of confirm messages delivered to the client - - , - e_STAT_PUSH - // Value: Accumulated bytes of all messages ever pushed to - // the client - // Increments: Number of messages ever pushed to the client - - , - e_STAT_PUT - // Value: Accumulated bytes of all messages ever received from - // the client - // Increments: Number of messages ever received from the client - }; -}; - //------------------------- // struct ClusterStatsIndex //------------------------- @@ -517,34 +488,6 @@ void ClusterNodeStats::initialize(const bmqt::Uri& uri, bmqst::StatContextConfiguration(uri.asString(), &localAllocator)); } -void ClusterNodeStats::onEvent(EventType::Enum type, bsls::Types::Int64 value) -{ - BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); - - switch (type) { - case EventType::e_ACK: { - // For ACK, we don't have any bytes value, but we also wouldn't care .. - d_statContext_mp->adjustValue(ClusterNodeStatsIndex::e_STAT_ACK, 1); - } break; - case EventType::e_CONFIRM: { - // For CONFIRM, we don't care about the bytes value .. - d_statContext_mp->adjustValue(ClusterNodeStatsIndex::e_STAT_CONFIRM, - 1); - } break; - case EventType::e_PUSH: { - d_statContext_mp->adjustValue(ClusterNodeStatsIndex::e_STAT_PUSH, - value); - } break; - case EventType::e_PUT: { - d_statContext_mp->adjustValue(ClusterNodeStatsIndex::e_STAT_PUT, - value); - } break; - default: { - BSLS_ASSERT_SAFE(false && "Unknown event type"); - } break; - }; -} - // ---------------------- // class ClusterStatsUtil // ---------------------- diff --git a/src/groups/mqb/mqbstat/mqbstat_clusterstats.h b/src/groups/mqb/mqbstat/mqbstat_clusterstats.h index 3016166a2a..3d7d5f78c2 100644 --- a/src/groups/mqb/mqbstat/mqbstat_clusterstats.h +++ b/src/groups/mqb/mqbstat/mqbstat_clusterstats.h @@ -30,6 +30,7 @@ // utility namespace exposing methods to initialize the stat contexts. // BMQ +#include #include // MQB @@ -47,11 +48,6 @@ namespace BloombergLP { -// FORWARD DECLARATION -namespace bmqst { -class StatContext; -} - namespace mqbstat { // ================== @@ -303,6 +299,33 @@ class ClusterNodeStats { bslma::ManagedPtr d_statContext_mp; // StatContext + // PRIVATE TYPES + + /// Namespace for the constants of stat values that applies to the + /// cluster node + struct ClusterNodeStatsIndex { + enum Enum { + /// Value: Number of ack messages delivered to the client + e_STAT_ACK + + , + e_STAT_CONFIRM + // Value: Number of confirm messages delivered to the client + + , + e_STAT_PUSH + // Value: Accumulated bytes of all messages ever pushed to + // the client + // Increments: Number of messages ever pushed to the client + + , + e_STAT_PUT + // Value: Accumulated bytes of all messages ever received from + // the client + // Increments: Number of messages ever received from the client + }; + }; + private: // NOT IMPLEMENTED ClusterNodeStats(const ClusterNodeStats&) BSLS_CPP11_DELETED; @@ -341,7 +364,8 @@ class ClusterNodeStats { /// Update statistics for the event of the specified `type` and with the /// specified `value` (depending on the `type`, `value` can represent /// the number of bytes, a counter, ... - void onEvent(EventType::Enum type, bsls::Types::Int64 value); + template + void onEvent(bsls::Types::Int64 value); /// Return a pointer to the statcontext. bmqst::StatContext* statContext(); @@ -393,6 +417,38 @@ inline bmqst::StatContext* ClusterNodeStats::statContext() return d_statContext_mp.get(); } +template <> +inline void +ClusterNodeStats::onEvent( + BSLS_ANNOTATION_UNUSED bsls::Types::Int64 value) +{ + d_statContext_mp->adjustValue(ClusterNodeStatsIndex::e_STAT_ACK, 1); +} + +template <> +inline void +ClusterNodeStats::onEvent( + BSLS_ANNOTATION_UNUSED bsls::Types::Int64 value) +{ + d_statContext_mp->adjustValue(ClusterNodeStatsIndex::e_STAT_CONFIRM, 1); +} + +template <> +inline void +ClusterNodeStats::onEvent( + bsls::Types::Int64 value) +{ + d_statContext_mp->adjustValue(ClusterNodeStatsIndex::e_STAT_PUSH, value); +} + +template <> +inline void +ClusterNodeStats::onEvent( + bsls::Types::Int64 value) +{ + d_statContext_mp->adjustValue(ClusterNodeStatsIndex::e_STAT_PUT, value); +} + } // close package namespace } // close enterprise namespace diff --git a/src/groups/mqb/mqbstat/mqbstat_domainstats.cpp b/src/groups/mqb/mqbstat/mqbstat_domainstats.cpp index afbd25e888..5b0b5a31d5 100644 --- a/src/groups/mqb/mqbstat/mqbstat_domainstats.cpp +++ b/src/groups/mqb/mqbstat/mqbstat_domainstats.cpp @@ -42,16 +42,6 @@ namespace { /// Name of the stat context to create (holding all domain's statistics) static const char k_DOMAIN_STAT_NAME[] = "domains"; -//------------------------ -// struct DomainStatsIndex -//------------------------ - -/// Namespace for the constants of stat values that applies to the queues -/// from the clients -struct DomainStatsIndex { - enum Enum { e_STAT_CFG_MSGS, e_STAT_CFG_BYTES, e_STAT_QUEUE_COUNT }; -}; - } // close unnamed namespace // ------------------ @@ -143,27 +133,6 @@ void DomainStats::initialize(mqbi::Domain* domain, datum->adopt(builder.commit()); } -void DomainStats::onEvent(EventType::Enum type, bsls::Types::Int64 value) -{ - BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); - - switch (type) { - case EventType::e_CFG_MSGS: { - d_statContext_mp->setValue(DomainStatsIndex::e_STAT_CFG_MSGS, value); - } break; - case EventType::e_CFG_BYTES: { - d_statContext_mp->setValue(DomainStatsIndex::e_STAT_CFG_BYTES, value); - } break; - case EventType::e_QUEUE_COUNT: { - d_statContext_mp->setValue(DomainStatsIndex::e_STAT_QUEUE_COUNT, - value); - } break; - default: { - BSLS_ASSERT_SAFE(false && "Unknown event type"); - } break; - }; -} - // --------------------- // class DomainStatsUtil // --------------------- diff --git a/src/groups/mqb/mqbstat/mqbstat_domainstats.h b/src/groups/mqb/mqbstat/mqbstat_domainstats.h index 91c7e96fa4..8f19a4d819 100644 --- a/src/groups/mqb/mqbstat/mqbstat_domainstats.h +++ b/src/groups/mqb/mqbstat/mqbstat_domainstats.h @@ -28,6 +28,7 @@ // exposing methods to initialize the stat contexts. // MQB +#include // BDE #include @@ -43,10 +44,6 @@ namespace BloombergLP { namespace mqbi { class Domain; } -namespace bmqst { -class StatContext; -} - namespace mqbstat { // ================= @@ -77,6 +74,14 @@ class DomainStats { bslma::ManagedPtr d_statContext_mp; // StatContext + // PRIVATE TYPES + + /// Namespace for the constants of stat values that applies to the queues + /// from the clients + struct DomainStatsIndex { + enum Enum { e_STAT_CFG_MSGS, e_STAT_CFG_BYTES, e_STAT_QUEUE_COUNT }; + }; + private: // NOT IMPLEMENTED DomainStats(const DomainStats&) BSLS_CPP11_DELETED; @@ -115,7 +120,8 @@ class DomainStats { /// Update statistics for the event of the specified `type` and with the /// specified `value` (depending on the `type`, `value` can represent /// the number of bytes, a counter, ... - void onEvent(EventType::Enum type, bsls::Types::Int64 value); + template + void onEvent(bsls::Types::Int64 value); /// Return a pointer to the statcontext. bmqst::StatContext* statContext(); @@ -150,6 +156,30 @@ inline bmqst::StatContext* DomainStats::statContext() return d_statContext_mp.get(); } +template <> +inline void DomainStats::onEvent( + bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + d_statContext_mp->setValue(DomainStatsIndex::e_STAT_CFG_MSGS, value); +} + +template <> +inline void DomainStats::onEvent( + bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + d_statContext_mp->setValue(DomainStatsIndex::e_STAT_CFG_BYTES, value); +} + +template <> +inline void DomainStats::onEvent( + bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + d_statContext_mp->setValue(DomainStatsIndex::e_STAT_QUEUE_COUNT, value); +} + } // close package namespace } // close enterprise namespace diff --git a/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp b/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp index 7943f4b57e..ab0e36b8e2 100644 --- a/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp +++ b/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp @@ -63,104 +63,6 @@ const int k_MAX_INSTANT_MESSAGES = 10; const bsls::Types::Int64 k_NS_PER_MESSAGE = bdlt::TimeUnitRatio::k_NANOSECONDS_PER_MINUTE / k_MAX_INSTANT_MESSAGES; -// ----------------------- -// struct DomainQueueStats -// ----------------------- - -/// Namespace for the constants of stat values that applies to the queues on -/// the domain -struct DomainQueueStats { - enum Enum { - /// Value: Current number of clients who opened the queue with - /// the `WRITE` flag - e_STAT_NB_PRODUCER - - , - /// Value: Current number of clients who opened the queue with - /// the 'READ' flag - e_STAT_NB_CONSUMER - - , - /// Value: Current number of messages in the queue - e_STAT_MESSAGES - - , - /// Value: Accumulated bytes of all messages currently in the - /// queue - e_STAT_BYTES - - , - /// Value: Number of ack messages delivered by this queue - e_STAT_ACK - - , - /// Value: The time between PUT and ACK (in nanoseconds). - e_STAT_ACK_TIME - - , - /// Value: Number of NACK messages generated for this queue - e_STAT_NACK - - , - /// Value: Number of CONFIRM messages received by this queue - e_STAT_CONFIRM - - , - /// Value: The time between PUSH and CONFIRM (in nanoseconds). - e_STAT_CONFIRM_TIME - - , - /// Value: Number of messages rejected by this queue (RDA - /// reaching zero) - e_STAT_REJECT - - , - /// Value: The time spent by the message in the queue (in - /// nanoseconds). - e_STAT_QUEUE_TIME - - , - /// Value: Accumulated bytes of all messages ever pushed from - /// the queue - /// Increment: Number of messages ever pushed from the queue - e_STAT_PUSH - - , - /// Value: Accumulated bytes of all messages ever put in the - /// queue - /// Increment: Number of messages ever put in the queue - e_STAT_PUT - - , - /// Value: Accumulated number of messages ever GC'ed in the - /// queue - e_STAT_GC_MSGS - - , - /// Value: Role (Unknown, Primary, Replica, Proxy) - e_STAT_ROLE - - , - /// Value: The configured queue messages capacity - e_CFG_MSGS - - , - /// Value: The configured queue bytes capacity - e_CFG_BYTES - - , - /// Value: Accumulated number of messages in the strong - /// consistency queue expired before receiving quorum - /// Receipts - e_STAT_NO_SC_MSGS - - , - // Value: Current number of GUIDs stored in queue's history - // (does not include messages in the queue) - e_STAT_HISTORY - }; -}; - // ------------------ // struct ClientStats // ------------------ @@ -512,101 +414,6 @@ QueueStatsDomain& QueueStatsDomain::setWriterCount(int writerCount) return *this; } -void QueueStatsDomain::onEvent(EventType::Enum type, bsls::Types::Int64 value) -{ - BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); - - switch (type) { - case EventType::e_ACK: { - // For ACK, we don't have any bytes value, but we also wouldn't care .. - d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_ACK, 1); - } break; - case EventType::e_ACK_TIME: { - d_statContext_mp->reportValue(DomainQueueStats::e_STAT_ACK_TIME, - value); - } break; - case EventType::e_NACK: { - // For NACK, we don't care about the bytes value .. - d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_NACK, 1); - } break; - case EventType::e_CONFIRM: { - // For CONFIRM, we don't care about the bytes value .. - d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_CONFIRM, 1); - } break; - case EventType::e_CONFIRM_TIME: { - d_statContext_mp->reportValue(DomainQueueStats::e_STAT_CONFIRM_TIME, - value); - } break; - case EventType::e_REJECT: { - d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_REJECT, 1); - } break; - case EventType::e_QUEUE_TIME: { - d_statContext_mp->reportValue(DomainQueueStats::e_STAT_QUEUE_TIME, - value); - } break; - case EventType::e_PUSH: { - d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_PUSH, value); - } break; - case EventType::e_PUT: { - d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_PUT, value); - } break; - case EventType::e_ADD_MESSAGE: { - d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_BYTES, value); - d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_MESSAGES, 1); - if (!d_subContextsHolder.empty()) { - bsl::list::iterator it = - d_subContextsHolder.begin(); - while (it != d_subContextsHolder.end()) { - it->get()->adjustValue(DomainQueueStats::e_STAT_BYTES, value); - it->get()->adjustValue(DomainQueueStats::e_STAT_MESSAGES, 1); - ++it; - } - } - } break; - case EventType::e_DEL_MESSAGE: { - d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_BYTES, -value); - d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_MESSAGES, -1); - } break; - case EventType::e_GC_MESSAGE: { - d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_GC_MSGS, value); - } break; - case EventType::e_PURGE: { - // NOTE: Setting the value like that will cause weird results if using - // the stat to get rates - d_statContext_mp->setValue(DomainQueueStats::e_STAT_BYTES, 0); - d_statContext_mp->setValue(DomainQueueStats::e_STAT_MESSAGES, 0); - if (!d_subContextsHolder.empty()) { - bsl::list::iterator it = - d_subContextsHolder.begin(); - while (it != d_subContextsHolder.end()) { - it->get()->setValue(DomainQueueStats::e_STAT_BYTES, 0); - it->get()->setValue(DomainQueueStats::e_STAT_MESSAGES, 0); - ++it; - } - } - } break; - case EventType::e_CHANGE_ROLE: { - d_statContext_mp->setValue(DomainQueueStats::e_STAT_ROLE, value); - } break; - case EventType::e_CFG_MSGS: { - d_statContext_mp->setValue(DomainQueueStats::e_CFG_MSGS, value); - } break; - case EventType::e_CFG_BYTES: { - d_statContext_mp->setValue(DomainQueueStats::e_CFG_BYTES, value); - } break; - case EventType::e_NO_SC_MESSAGE: { - d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_NO_SC_MSGS, - value); - } break; - case EventType::e_UPDATE_HISTORY: { - d_statContext_mp->setValue(DomainQueueStats::e_STAT_HISTORY, value); - } break; - default: { - BSLS_ASSERT_SAFE(false && "Unknown event type"); - } break; - }; -} - void QueueStatsDomain::onEvent(EventType::Enum type, bsls::Types::Int64 value, const bsl::string& appId) diff --git a/src/groups/mqb/mqbstat/mqbstat_queuestats.h b/src/groups/mqb/mqbstat/mqbstat_queuestats.h index 7d4a389b8f..d6fcd8e5b6 100644 --- a/src/groups/mqb/mqbstat/mqbstat_queuestats.h +++ b/src/groups/mqb/mqbstat/mqbstat_queuestats.h @@ -258,7 +258,8 @@ class QueueStatsDomain { /// Update statistics for the event of the specified `type` and with the /// specified `value`. Depending on the `type`, `value` can represent /// the number of bytes, a counter, ... - void onEvent(EventType::Enum type, bsls::Types::Int64 value); + template + void onEvent(bsls::Types::Int64 value); /// Update statistics for the event of the specified `type` and with the /// specified `value` for the specified `appId`. Depending on the `type`, @@ -414,6 +415,104 @@ struct QueueStatsUtil { bmqst::StatContext* statContext); }; +// ----------------------- +// struct DomainQueueStats +// ----------------------- + +/// Namespace for the constants of stat values that applies to the queues +/// on the domain +struct DomainQueueStats { + enum Enum { + /// Value: Current number of clients who opened the queue with + /// the `WRITE` flag + e_STAT_NB_PRODUCER + + , + /// Value: Current number of clients who opened the queue with + /// the 'READ' flag + e_STAT_NB_CONSUMER + + , + /// Value: Current number of messages in the queue + e_STAT_MESSAGES + + , + /// Value: Accumulated bytes of all messages currently in the + /// queue + e_STAT_BYTES + + , + /// Value: Number of ack messages delivered by this queue + e_STAT_ACK + + , + /// Value: The time between PUT and ACK (in nanoseconds). + e_STAT_ACK_TIME + + , + /// Value: Number of NACK messages generated for this queue + e_STAT_NACK + + , + /// Value: Number of CONFIRM messages received by this queue + e_STAT_CONFIRM + + , + /// Value: The time between PUSH and CONFIRM (in nanoseconds). + e_STAT_CONFIRM_TIME + + , + /// Value: Number of messages rejected by this queue (RDA + /// reaching zero) + e_STAT_REJECT + + , + /// Value: The time spent by the message in the queue (in + /// nanoseconds). + e_STAT_QUEUE_TIME + + , + /// Value: Accumulated bytes of all messages ever pushed from + /// the queue + /// Increment: Number of messages ever pushed from the queue + e_STAT_PUSH + + , + /// Value: Accumulated bytes of all messages ever put in the + /// queue + /// Increment: Number of messages ever put in the queue + e_STAT_PUT + + , + /// Value: Accumulated number of messages ever GC'ed in the + /// queue + e_STAT_GC_MSGS + + , + /// Value: Role (Unknown, Primary, Replica, Proxy) + e_STAT_ROLE + + , + /// Value: The configured queue messages capacity + e_CFG_MSGS + + , + /// Value: The configured queue bytes capacity + e_CFG_BYTES + + , + /// Value: Accumulated number of messages in the strong + /// consistency queue expired before receiving quorum + /// Receipts + e_STAT_NO_SC_MSGS + + , + // Value: Current number of GUIDs stored in queue's history + // (does not include messages in the queue) + e_STAT_HISTORY + }; +}; + // ============================================================================ // INLINE DEFINITIONS // ============================================================================ @@ -427,6 +526,193 @@ inline bmqst::StatContext* QueueStatsDomain::statContext() return d_statContext_mp.get(); } +template <> +inline void +QueueStatsDomain::onEvent( + BSLS_ANNOTATION_UNUSED bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_ACK, 1); +} + +template <> +inline void +QueueStatsDomain::onEvent( + bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + d_statContext_mp->reportValue(DomainQueueStats::e_STAT_ACK_TIME, value); +} + +template <> +inline void +QueueStatsDomain::onEvent( + BSLS_ANNOTATION_UNUSED bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + // For NACK, we don't care about the bytes value + d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_NACK, 1); +} + +template <> +inline void +QueueStatsDomain::onEvent( + BSLS_ANNOTATION_UNUSED bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + // For CONFIRM, we don't care about the bytes value + d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_CONFIRM, 1); +} + +template <> +inline void +QueueStatsDomain::onEvent( + bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + d_statContext_mp->reportValue(DomainQueueStats::e_STAT_CONFIRM_TIME, + value); +} + +template <> +inline void +QueueStatsDomain::onEvent( + BSLS_ANNOTATION_UNUSED bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + // For REJECT, we don't care about the bytes value + d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_REJECT, 1); +} + +template <> +inline void +QueueStatsDomain::onEvent( + bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + d_statContext_mp->reportValue(DomainQueueStats::e_STAT_QUEUE_TIME, value); +} + +template <> +inline void +QueueStatsDomain::onEvent( + bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_PUSH, value); +} + +template <> +inline void +QueueStatsDomain::onEvent( + bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_PUT, value); +} + +template <> +inline void +QueueStatsDomain::onEvent( + bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_BYTES, value); + d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_MESSAGES, 1); + if (!d_subContextsHolder.empty()) { + bsl::list::iterator it = d_subContextsHolder.begin(); + while (it != d_subContextsHolder.end()) { + it->get()->adjustValue(DomainQueueStats::e_STAT_BYTES, value); + it->get()->adjustValue(DomainQueueStats::e_STAT_MESSAGES, 1); + ++it; + } + } +} + +template <> +inline void +QueueStatsDomain::onEvent( + bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_BYTES, -value); + d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_MESSAGES, -1); +} + +template <> +inline void +QueueStatsDomain::onEvent( + bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_GC_MSGS, value); +} + +template <> +inline void +QueueStatsDomain::onEvent( + BSLS_ANNOTATION_UNUSED bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + // NOTE: Setting the value like that will cause weird results if using + // the stat to get rates + d_statContext_mp->setValue(DomainQueueStats::e_STAT_BYTES, 0); + d_statContext_mp->setValue(DomainQueueStats::e_STAT_MESSAGES, 0); + if (!d_subContextsHolder.empty()) { + bsl::list::iterator it = d_subContextsHolder.begin(); + while (it != d_subContextsHolder.end()) { + it->get()->setValue(DomainQueueStats::e_STAT_BYTES, 0); + it->get()->setValue(DomainQueueStats::e_STAT_MESSAGES, 0); + ++it; + } + } +} + +template <> +inline void +QueueStatsDomain::onEvent( + bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + d_statContext_mp->setValue(DomainQueueStats::e_STAT_ROLE, value); +} + +template <> +inline void +QueueStatsDomain::onEvent( + bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + d_statContext_mp->setValue(DomainQueueStats::e_CFG_MSGS, value); +} + +template <> +inline void +QueueStatsDomain::onEvent( + bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + d_statContext_mp->setValue(DomainQueueStats::e_CFG_BYTES, value); +} + +template <> +inline void +QueueStatsDomain::onEvent( + bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_NO_SC_MSGS, value); +} + +template <> +inline void +QueueStatsDomain::onEvent( + bsls::Types::Int64 value) +{ + BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); + d_statContext_mp->setValue(DomainQueueStats::e_STAT_HISTORY, value); +} + // ----------------------------- // struct QueueStatsDomain::Role // ----------------------------- diff --git a/src/groups/mqb/mqbstat/mqbstat_queuestats.t.cpp b/src/groups/mqb/mqbstat/mqbstat_queuestats.t.cpp index 65590a1506..e06c54d5e5 100644 --- a/src/groups/mqb/mqbstat/mqbstat_queuestats.t.cpp +++ b/src/groups/mqb/mqbstat/mqbstat_queuestats.t.cpp @@ -314,25 +314,25 @@ static void test3_queueStatsDomain() queueStatsDomain.setWriterCount(2); // 2 acks : bytes irrelevant - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_ACK, k_DUMMY); - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_ACK, k_DUMMY); + queueStatsDomain.onEvent(k_DUMMY); + queueStatsDomain.onEvent(k_DUMMY); // 1 confirm : bytes irrelevant - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_CONFIRM, k_DUMMY); + queueStatsDomain.onEvent(k_DUMMY); // 1 push : 9 bytes - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_PUSH, 9); + queueStatsDomain.onEvent(9); // 3 puts : 33 bytes - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_PUT, 10); - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_PUT, 11); - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_PUT, 12); + queueStatsDomain.onEvent(10); + queueStatsDomain.onEvent(11); + queueStatsDomain.onEvent(12); // 1 add message : 15 bytes - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_ADD_MESSAGE, 15); + queueStatsDomain.onEvent(15); // 1 GUID in history - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_UPDATE_HISTORY, 1); + queueStatsDomain.onEvent(1); domain->snapshot(); // The following stats are not range based, and therefore always return the @@ -363,29 +363,29 @@ static void test3_queueStatsDomain() queueStatsDomain.setReaderCount(3).setWriterCount(1); // 4 acks : bytes irrelevant - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_ACK, k_DUMMY); - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_ACK, k_DUMMY); - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_ACK, k_DUMMY); - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_ACK, k_DUMMY); + queueStatsDomain.onEvent(k_DUMMY); + queueStatsDomain.onEvent(k_DUMMY); + queueStatsDomain.onEvent(k_DUMMY); + queueStatsDomain.onEvent(k_DUMMY); // 3 confirms : bytes irrelevant - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_CONFIRM, k_DUMMY); - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_CONFIRM, k_DUMMY); - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_CONFIRM, k_DUMMY); + queueStatsDomain.onEvent(k_DUMMY); + queueStatsDomain.onEvent(k_DUMMY); + queueStatsDomain.onEvent(k_DUMMY); // 1 push : 9 bytes - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_PUSH, 11); + queueStatsDomain.onEvent(11); // 2 puts : 22 bytes - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_PUT, 10); - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_PUT, 12); + queueStatsDomain.onEvent(10); + queueStatsDomain.onEvent(12); // del 1 message - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_DEL_MESSAGE, 15); + queueStatsDomain.onEvent(15); // 3 GUIDs in history (first 5, then gc results in 3) - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_UPDATE_HISTORY, 5); - queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_UPDATE_HISTORY, 3); + queueStatsDomain.onEvent(5); + queueStatsDomain.onEvent(3); domain->snapshot(); // The following stats are not range based, and therefore always return the @@ -474,8 +474,8 @@ static void test4_queueStatsDomainContent() } { - obj.onEvent(mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE, 3); - obj.onEvent(mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE, 5); + obj.onEvent(3); + obj.onEvent(5); sc->snapshot(); @@ -487,8 +487,8 @@ static void test4_queueStatsDomainContent() } { - obj.onEvent(mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE, 7); - obj.onEvent(mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE, 3); + obj.onEvent(7); + obj.onEvent(3); sc->snapshot(); @@ -500,7 +500,7 @@ static void test4_queueStatsDomainContent() } { - obj.onEvent(mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE, 5); + obj.onEvent(5); sc->snapshot();