Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf[MQB]: inline stats onEvent #542

Merged
merged 15 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions src/groups/mqb/mqba/mqba_clientsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,9 +584,11 @@ void ClientSession::sendAck(bmqt::AckResult::Enum status,

// If queue is found, report locally generated NACK
if (isSelfGenerated) {
queueState->d_handle_p->queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_NACK,
1);
queueState->d_handle_p->queue()
->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_NACK>(

1);
}
}

Expand Down Expand Up @@ -1658,9 +1660,9 @@ void ClientSession::onAckEvent(const mqbi::DispatcherAckEvent& event)
// Calculate time delta between PUT and ACK
const bsls::Types::Int64 timeDelta =
bmqsys::Time::highResolutionTimer() - cit->second.d_timeStamp;
queue->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_ACK_TIME,
timeDelta);
queue->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ACK_TIME>(
timeDelta);

if (!d_isClientGeneratingGUIDs) {
// Legacy client.
Expand Down Expand Up @@ -2693,8 +2695,8 @@ ClientSession::ClientSession(
this,
bdlf::PlaceHolders::_1)); // type

mqbstat::BrokerStats::instance().onEvent(
mqbstat::BrokerStats::EventType::e_CLIENT_CREATED);
mqbstat::BrokerStats::instance()
.onEvent<mqbstat::BrokerStats::EventType::e_CLIENT_CREATED>();

BALL_LOG_INFO << description() << ": created "
<< "[dispatcherProcessor: " << processor
Expand All @@ -2714,8 +2716,8 @@ ClientSession::~ClientSession()

BALL_LOG_INFO << description() << ": destructor";

mqbstat::BrokerStats::instance().onEvent(
mqbstat::BrokerStats::EventType::e_CLIENT_DESTROYED);
mqbstat::BrokerStats::instance()
.onEvent<mqbstat::BrokerStats::EventType::e_CLIENT_DESTROYED>();

// Unregister from the dispatcher
dispatcher()->unregisterClient(this);
Expand Down
35 changes: 19 additions & 16 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,9 @@ void Cluster::sendAck(bmqt::AckResult::Enum status,

// If queue exists, report self generated NACK
if (isSelfGenerated) {
it->second.d_handle_p->queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_NACK,
1);
it->second.d_handle_p->queue()
->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_NACK>(1);
}
}
else if (!isSelfGenerated) {
Expand Down Expand Up @@ -467,9 +467,9 @@ void Cluster::sendAck(bmqt::AckResult::Enum status,
cit->second.d_subQueueInfosMap.findBySubIdSafe(
bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID);
if (subQueueCiter != cit->second.d_subQueueInfosMap.end()) {
subQueueCiter->value().d_clientStats->onEvent(
mqbstat::ClusterNodeStats::EventType::e_ACK,
1);
subQueueCiter->value()
.d_clientStats
->onEvent<mqbstat::ClusterNodeStats::EventType::e_ACK>(1);
}
// In the case of Strong Consistency, a Receipt can arrive and trigger
// an ACK after Producer closes subStream.
Expand Down Expand Up @@ -548,7 +548,7 @@ void Cluster::generateNack(bmqt::AckResult::Enum status,
options);

// Report locally generated NACK
queue->stats()->onEvent(mqbstat::QueueStatsDomain::EventType::e_NACK, 1);
queue->stats()->onEvent<mqbstat::QueueStatsDomain::EventType::e_NACK>(1);

bmqu::MemOutStream os;
os << description() << ": Failed to relay PUT message "
Expand Down Expand Up @@ -1206,9 +1206,11 @@ void Cluster::onPutEvent(const mqbi::DispatcherPutEvent& event)
queueState.d_subQueueInfosMap.findBySubId(
bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID);

subQueueCiter->value().d_clientStats->onEvent(
mqbstat::ClusterNodeStats::EventType::e_PUT,
appDataSp->length());
subQueueCiter->value()
.d_clientStats
->onEvent<mqbstat::ClusterNodeStats::EventType::e_PUT>(

appDataSp->length());

// TBD: groupId: Similar to 'appDataSp' above, load 'optionsSp' here,
// using something like PutMessageIterator::loadOptions().
Expand Down Expand Up @@ -1644,9 +1646,9 @@ Cluster::validateMessage(mqbi::QueueHandle** queueHandle,

if (eventType == bmqp::EventType::e_CONFIRM) {
// Update client stats
subQueueIt->value().d_clientStats->onEvent(
mqbstat::ClusterNodeStats::EventType::e_CONFIRM,
1);
subQueueIt->value()
.d_clientStats
->onEvent<mqbstat::ClusterNodeStats::EventType::e_CONFIRM>(1);
}

return ValidationResult::k_SUCCESS;
Expand Down Expand Up @@ -1912,9 +1914,10 @@ void Cluster::onPushEvent(const mqbi::DispatcherPushEvent& event)
queueState.d_subQueueInfosMap.findBySubscriptionId(
event.subQueueInfos()[i].id());

subQueueCiter->value().d_clientStats->onEvent(
mqbstat::ClusterNodeStats::EventType::e_PUSH,
event.blob() ? event.blob()->length() : 0);
subQueueCiter->value()
.d_clientStats
->onEvent<mqbstat::ClusterNodeStats::EventType::e_PUSH>(
event.blob() ? event.blob()->length() : 0);
}

bmqt::GenericResult::Enum rc = bmqt::GenericResult::e_SUCCESS;
Expand Down
8 changes: 4 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_domain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,10 @@ int Domain::configure(bsl::ostream& errorDescription,
d_capacityMeter.setLimits(limits.messages(), limits.bytes())
.setWatermarkThresholds(limits.messagesWatermarkRatio(),
limits.bytesWatermarkRatio());
d_domainsStats.onEvent(mqbstat::DomainStats::EventType::e_CFG_MSGS,
limits.messages());
d_domainsStats.onEvent(mqbstat::DomainStats::EventType::e_CFG_BYTES,
limits.bytes());
d_domainsStats.onEvent<mqbstat::DomainStats::EventType::e_CFG_MSGS>(
limits.messages());
d_domainsStats.onEvent<mqbstat::DomainStats::EventType::e_CFG_BYTES>(
limits.bytes());

if (isReconfigure) {
BSLS_ASSERT_OPT(oldConfig.has_value());
Expand Down
44 changes: 21 additions & 23 deletions src/groups/mqb/mqbblp/mqbblp_localqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,17 @@ int LocalQueue::configure(bsl::ostream& errorDescription, bool isReconfigure)
d_state_p->uri(),
d_state_p->partitionId());

d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE,
mqbstat::QueueStatsDomain::Role::e_PRIMARY);
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE>(
mqbstat::QueueStatsDomain::Role::e_PRIMARY);

d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CFG_MSGS,
domainCfg.storage().queueLimits().messages());
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_CFG_MSGS>(
domainCfg.storage().queueLimits().messages());

d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CFG_BYTES,
domainCfg.storage().queueLimits().bytes());
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_CFG_BYTES>(
domainCfg.storage().queueLimits().bytes());

if (isReconfigure) {
if (domainCfg.mode().isFanoutValue()) {
Expand Down Expand Up @@ -482,9 +482,9 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,
// Calculate time delta between PUT and ACK
const bsls::Types::Int64 timeDelta =
bmqsys::Time::highResolutionTimer() - timePoint;
d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_ACK_TIME,
timeDelta);
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ACK_TIME>(
timeDelta);
if (res != mqbi::StorageResult::e_SUCCESS || doAck) {
bmqp::AckMessage ackMessage;
ackMessage
Expand All @@ -509,9 +509,9 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,
// flushed (which occurs in 'flush' routine). In no case should
// 'afterNewMessage' be called here.

d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_PUT,
appData->length());
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_PUT>(
appData->length());
}
else {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
Expand All @@ -525,9 +525,8 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,
}
}
else {
d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_NACK,
1);
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_NACK>(1);
}
}
}
Expand All @@ -548,9 +547,8 @@ void LocalQueue::onReceipt(const bmqt::MessageGUID& msgGUID,
const bsls::Types::Int64 timeDelta = bmqsys::Time::highResolutionTimer() -
arrivalTimepoint;

d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_ACK_TIME,
timeDelta);
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ACK_TIME>(timeDelta);

if (d_state_p->handleCatalog().hasHandle(qH)) {
// Send acknowledgement
Expand All @@ -572,8 +570,8 @@ void LocalQueue::onRemoval(const bmqt::MessageGUID& msgGUID,
// TODO: do we need to update NACK stats considering that downstream can
// NACK the same GUID as well?

d_state_p->stats()->onEvent(mqbstat::QueueStatsDomain::EventType::e_NACK,
1);
d_state_p->stats()->onEvent<mqbstat::QueueStatsDomain::EventType::e_NACK>(
1);

if (d_state_p->handleCatalog().hasHandle(qH)) {
// Send negative acknowledgement
Expand Down
6 changes: 3 additions & 3 deletions src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1452,9 +1452,9 @@ void QueueEngineUtil_AppState::reportStats(
message->attributes());

// First report 'queue time' metric for the entire queue
d_queue_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_QUEUE_TIME,
timeDelta);
d_queue_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_QUEUE_TIME>(
timeDelta);

// Then report 'queue time' metric for appId
d_queue_p->stats()->onEvent(
Expand Down
22 changes: 11 additions & 11 deletions src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,9 @@ QueueHandle::updateMonitor(const bsl::shared_ptr<Downstream>& subStream,
// The message exist in the storage, we likely are in situation
// (3), so it's a legit confirm and therefore, update the domain
// stats.
d_domainStats_p->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CONFIRM,
msgSize);
d_domainStats_p
->onEvent<mqbstat::QueueStatsDomain::EventType::e_CONFIRM>(
msgSize);
}

BALL_LOG_INFO_BLOCK
Expand Down Expand Up @@ -404,15 +404,15 @@ mqbu::ResourceUsageMonitorStateTransition::Enum QueueHandle::updateMonitor(

if (type == bmqp::EventType::e_CONFIRM) {
// Update domain stats
d_domainStats_p->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CONFIRM,
msgSize);
d_domainStats_p
->onEvent<mqbstat::QueueStatsDomain::EventType::e_CONFIRM>(
msgSize);

// Report CONFIRM time only at first hop
// Note that we update metric per entire queue and also per `appId`
if (d_clientContext_sp->isFirstHop()) {
d_domainStats_p->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CONFIRM_TIME,
d_domainStats_p->onEvent<
mqbstat::QueueStatsDomain::EventType::e_CONFIRM_TIME>(
timeDelta);
d_domainStats_p->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CONFIRM_TIME,
Expand Down Expand Up @@ -498,8 +498,8 @@ void QueueHandle::deliverMessageImpl(
BSLS_ASSERT_SAFE(subQueueInfos.size() >= 1 &&
subQueueInfos.size() <= d_subscriptions.size());

d_domainStats_p->onEvent(mqbstat::QueueStatsDomain::EventType::e_PUSH,
msgSize);
d_domainStats_p->onEvent<mqbstat::QueueStatsDomain::EventType::e_PUSH>(
msgSize);

// Create an event to dispatch delivery of the message to the client
mqbi::DispatcherClient* client = d_clientContext_sp->client();
Expand Down Expand Up @@ -1208,7 +1208,7 @@ void QueueHandle::onAckMessage(const bmqp::AckMessage& ackMessage)
ackMsg.setQueueId(id());

client->dispatcher()->dispatchEvent(event, client);
d_domainStats_p->onEvent(mqbstat::QueueStatsDomain::EventType::e_ACK, 1);
d_domainStats_p->onEvent<mqbstat::QueueStatsDomain::EventType::e_ACK>(1);
}

bool QueueHandle::canDeliver(unsigned int downstreamSubscriptionId) const
Expand Down
8 changes: 4 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_queuestate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ QueueState::QueueState(mqbi::Queue* queue,

// NOTE: The 'description' will be set by the owner of this object.

mqbstat::BrokerStats::instance().onEvent(
mqbstat::BrokerStats::EventType::e_QUEUE_CREATED);
mqbstat::BrokerStats::instance()
.onEvent<mqbstat::BrokerStats::EventType::e_QUEUE_CREATED>();
}

QueueState::~QueueState()
{
mqbstat::BrokerStats::instance().onEvent(
mqbstat::BrokerStats::EventType::e_QUEUE_DESTROYED);
mqbstat::BrokerStats::instance()
.onEvent<mqbstat::BrokerStats::EventType::e_QUEUE_DESTROYED>();
}

void QueueState::add(const bmqp_ctrlmsg::QueueHandleParameters& params)
Expand Down
26 changes: 13 additions & 13 deletions src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ int RemoteQueue::configureAsProxy(bsl::ostream& errorDescription,
return 10 * rc + rc_QUEUE_ENGINE_CFG_FAILURE; // RETURN
}

d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE,
mqbstat::QueueStatsDomain::Role::e_PROXY);
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE>(
mqbstat::QueueStatsDomain::Role::e_PROXY);

BALL_LOG_INFO
<< "Created a ProxyRemoteQueue "
Expand Down Expand Up @@ -276,9 +276,9 @@ int RemoteQueue::configureAsClusterMember(bsl::ostream& errorDescription,
d_state_p->storageManager()->setQueueRaw(queue,
d_state_p->uri(),
d_state_p->partitionId());
d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE,
mqbstat::QueueStatsDomain::Role::e_REPLICA);
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE>(
mqbstat::QueueStatsDomain::Role::e_REPLICA);

BALL_LOG_INFO << d_state_p->domain()->cluster()->name()
<< ": Created a ClusterMemberRemoteQueue "
Expand Down Expand Up @@ -922,9 +922,8 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn,
bmqt::AckResult::e_REFUSED));
ackMessage.setMessageGUID(putHeader.messageGUID());

d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_NACK,
1);
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_NACK>(1);

// CorrelationId & QueueId are left unset as those fields
// will be filled downstream.
Expand Down Expand Up @@ -996,8 +995,9 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn,
// the time the message is actually sent upstream, i.e. in
// cluster/clusterProxy) for the most exact accuracy, but doing it here is
// good enough.
d_state_p->stats()->onEvent(mqbstat::QueueStatsDomain::EventType::e_PUT,
appData->length());

d_state_p->stats()->onEvent<mqbstat::QueueStatsDomain::EventType::e_PUT>(
appData->length());
}

void RemoteQueue::confirmMessage(const bmqt::MessageGUID& msgGUID,
Expand Down Expand Up @@ -1496,8 +1496,8 @@ RemoteQueue::Puts::iterator& RemoteQueue::nack(Puts::iterator& it,
{
ackMessage.setMessageGUID(it->first);

d_state_p->stats()->onEvent(mqbstat::QueueStatsDomain::EventType::e_NACK,
1);
d_state_p->stats()->onEvent<mqbstat::QueueStatsDomain::EventType::e_NACK>(
1);

// CorrelationId & QueueId are left unset as those fields
// will be filled downstream.
Expand Down
Loading
Loading