diff --git a/src/groups/mqb/mqba/mqba_application.cpp b/src/groups/mqb/mqba/mqba_application.cpp index 9df92b9078..1f027df965 100644 --- a/src/groups/mqb/mqba/mqba_application.cpp +++ b/src/groups/mqb/mqba/mqba_application.cpp @@ -535,11 +535,12 @@ Application::getRelevantCluster(const mqbcmd::CommandChoice& command, // attempt to locate the domain bsl::shared_ptr domainSp; - if (d_domainManager_mp->locateDomain(&domainSp, domainName) != 0) { + if (0 != + d_domainManager_mp->locateOrCreateDomain(&domainSp, domainName)) { mwcu::MemOutStream os; os << "Domain '" << domainName << "' doesn't exist"; cmdResult->makeError().message() = os.str(); - return nullptr; + return nullptr; // RETURN } return domainSp->cluster(); @@ -751,7 +752,7 @@ int Application::processCommand(const bslstl::StringRef& source, } // otherwise, this is an original call. utilize router if necessary - mqba::RouteCommandManager routeCommandManager(cmd, commandWithOptions); + mqba::CommandRouter routeCommandManager(cmd, commandWithOptions); bool shouldSelfExecute = true; bsl::string selfName; diff --git a/src/groups/mqb/mqba/mqba_commandrouter.cpp b/src/groups/mqb/mqba/mqba_commandrouter.cpp index 8a55bc7f9b..baa5e31c3a 100644 --- a/src/groups/mqb/mqba/mqba_commandrouter.cpp +++ b/src/groups/mqb/mqba/mqba_commandrouter.cpp @@ -1,4 +1,4 @@ -// Copyright 2014-2023 Bloomberg Finance L.P. +// Copyright 2024 Bloomberg Finance L.P. // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -34,9 +34,8 @@ namespace { const char k_LOG_CATEGORY[] = "MQBA.COMMANDROUTER"; } // close unnamed namespace -RouteCommandManager::RouteCommandManager( - const bsl::string& commandString, - const mqbcmd::Command& commandWithOptions) +CommandRouter::CommandRouter(const bsl::string& commandString, + const mqbcmd::Command& commandWithOptions) : d_commandString(commandString) , d_commandWithOptions(commandWithOptions) , d_command(commandWithOptions.choice()) @@ -49,21 +48,21 @@ RouteCommandManager::RouteCommandManager( } } -RouteCommandManager::RoutingMode::RoutingMode() +CommandRouter::RoutingMode::RoutingMode() { } -RouteCommandManager::RoutingMode::~RoutingMode() +CommandRouter::RoutingMode::~RoutingMode() { } -RouteCommandManager::AllPartitionPrimariesRoutingMode:: +CommandRouter::AllPartitionPrimariesRoutingMode:: AllPartitionPrimariesRoutingMode() { } -RouteCommandManager::RouteMembers -RouteCommandManager::AllPartitionPrimariesRoutingMode::getRouteMembers( +CommandRouter::RouteMembers +CommandRouter::AllPartitionPrimariesRoutingMode::getRouteMembers( mqbi::Cluster* cluster) { NodesVector primaryNodes; @@ -81,14 +80,14 @@ RouteCommandManager::AllPartitionPrimariesRoutingMode::getRouteMembers( return {primaryNodes, isSelfPrimary}; } -RouteCommandManager::SinglePartitionPrimaryRoutingMode:: +CommandRouter::SinglePartitionPrimaryRoutingMode:: SinglePartitionPrimaryRoutingMode(int partitionId) : d_partitionId(partitionId) { } -RouteCommandManager::RouteMembers -RouteCommandManager::SinglePartitionPrimaryRoutingMode::getRouteMembers( +CommandRouter::RouteMembers +CommandRouter::SinglePartitionPrimaryRoutingMode::getRouteMembers( mqbi::Cluster* cluster) { mqbnet::ClusterNode* node = nullptr; @@ -113,13 +112,12 @@ RouteCommandManager::SinglePartitionPrimaryRoutingMode::getRouteMembers( return {nodes, isSelfPrimary}; } -RouteCommandManager::ClusterRoutingMode::ClusterRoutingMode() +CommandRouter::ClusterRoutingMode::ClusterRoutingMode() { } -RouteCommandManager::RouteMembers -RouteCommandManager::ClusterRoutingMode::getRouteMembers( - mqbi::Cluster* cluster) +CommandRouter::RouteMembers +CommandRouter::ClusterRoutingMode::getRouteMembers(mqbi::Cluster* cluster) { typedef mqbnet::Cluster::NodesList NodesList; // collect all nodes in cluster @@ -141,7 +139,7 @@ RouteCommandManager::ClusterRoutingMode::getRouteMembers( }; } -bool RouteCommandManager::route(mqbi::Cluster* relevantCluster) +bool CommandRouter::route(mqbi::Cluster* relevantCluster) { // PRECONDITIONS BSLS_ASSERT_SAFE(d_routingMode); @@ -155,9 +153,9 @@ bool RouteCommandManager::route(mqbi::Cluster* relevantCluster) RouteMembers routeMembers = d_routingMode->getRouteMembers( relevantCluster); - if (routeMembers.nodes.size() == 0) { + if (routeMembers.d_nodes.size() == 0) { countDownLatch(); - return routeMembers.self; + return routeMembers.d_self; } RequestContextSp contextSp = @@ -169,15 +167,15 @@ bool RouteCommandManager::route(mqbi::Cluster* relevantCluster) adminCommand.command() = d_commandString; adminCommand.rerouted() = true; - contextSp->setDestinationNodes(routeMembers.nodes); + contextSp->setDestinationNodes(routeMembers.d_nodes); mwcu::MemOutStream os; os << "Routing command to the following nodes ["; - for (NodesVector::const_iterator nit = routeMembers.nodes.begin(); - nit != routeMembers.nodes.end(); + for (NodesVector::const_iterator nit = routeMembers.d_nodes.begin(); + nit != routeMembers.d_nodes.end(); nit++) { os << (*nit)->hostName(); - if (nit + 1 != routeMembers.nodes.end()) { + if (nit + 1 != routeMembers.d_nodes.end()) { os << ", "; } } @@ -186,17 +184,17 @@ bool RouteCommandManager::route(mqbi::Cluster* relevantCluster) BALL_LOG_INFO << os.str(); contextSp->setResponseCb( - bdlf::BindUtil::bind(&RouteCommandManager::onRouteCommandResponse, + bdlf::BindUtil::bind(&CommandRouter::onRouteCommandResponse, this, bdlf::PlaceHolders::_1)); relevantCluster->multiRequestManager().sendRequest(contextSp, bsls::TimeInterval(3)); - return routeMembers.self; + return routeMembers.d_self; } -void RouteCommandManager::onRouteCommandResponse( +void CommandRouter::onRouteCommandResponse( const MultiRequestContextSp& requestContext) { typedef bsl::pair @@ -249,33 +247,27 @@ void RouteCommandManager::onRouteCommandResponse( countDownLatch(); } -RouteCommandManager::RoutingModeMp RouteCommandManager::getCommandRoutingMode() +CommandRouter::RoutingModeMp CommandRouter::getCommandRoutingMode() { - RouteCommandManager::RoutingMode* routingMode = nullptr; + RoutingMode* routingMode = nullptr; if (d_command.isDomainsValue()) { const mqbcmd::DomainsCommand& domains = d_command.domains(); if (domains.isDomainValue()) { const mqbcmd::DomainCommand& domain = domains.domain().command(); if (domain.isPurgeValue()) { - routingMode = - new AllPartitionPrimariesRoutingMode(); // DOMAINS DOMAIN - // PURGE + routingMode = new AllPartitionPrimariesRoutingMode(); + // DOMAINS DOMAIN PURGE } else if (domain.isQueueValue()) { if (domain.queue().command().isPurgeAppIdValue()) { - routingMode = - new AllPartitionPrimariesRoutingMode(); // DOMAINS - // DOMAIN - // - // QUEUE - // - // PURGE + routingMode = new AllPartitionPrimariesRoutingMode(); + // DOMAINS DOMAIN QUEUE PURGE } } } else if (domains.isReconfigureValue()) { - routingMode = - new ClusterRoutingMode(); // DOMAINS RECONFIGURE + routingMode = new ClusterRoutingMode(); + // DOMAINS RECONFIGURE } } else if (d_command.isClustersValue()) { @@ -284,10 +276,8 @@ RouteCommandManager::RoutingModeMp RouteCommandManager::getCommandRoutingMode() const mqbcmd::ClusterCommand& cluster = clusters.cluster().command(); if (cluster.isForceGcQueuesValue()) { - routingMode = - new AllPartitionPrimariesRoutingMode(); // CLUSTERS - // CLUSTER - // FORCE_GC_QUEUES + routingMode = new AllPartitionPrimariesRoutingMode(); + // CLUSTERS CLUSTER FORCE_GC_QUEUES } else if (cluster.isStorageValue()) { const mqbcmd::StorageCommand& storage = cluster.storage(); @@ -296,9 +286,9 @@ RouteCommandManager::RoutingModeMp RouteCommandManager::getCommandRoutingMode() storage.partition().command().isDisableValue()) { int partitionId = storage.partition().partitionId(); routingMode = new SinglePartitionPrimaryRoutingMode( - partitionId); // CLUSTERS CLUSTER STORAGE - // PARTITION - // [ENABLE|DISABLE] + partitionId); + // CLUSTERS CLUSTER STORAGE PARTITION + // [ENABLE|DISABLE] } // SUMMARY doesn't need to route to primary } @@ -309,22 +299,18 @@ RouteCommandManager::RoutingModeMp RouteCommandManager::getCommandRoutingMode() const mqbcmd::SetTunable& tunable = replication.setTunable(); if (tunable.choice().isAllValue()) { - routingMode = - new ClusterRoutingMode(); // CLUSTERS CLUSTER - // STORAGE - // REPLICATION - // SET_ALL + routingMode = new ClusterRoutingMode(); + // CLUSTERS CLUSTER STORAGE REPLICATION + // SET_ALL } } else if (replication.isGetTunableValue()) { const mqbcmd::GetTunable& tunable = replication.getTunable(); if (tunable.choice().isAllValue()) { - routingMode = - new ClusterRoutingMode(); // CLUSTERS CLUSTER - // STORAGE - // REPLICATION - // GET_ALL + routingMode = new ClusterRoutingMode(); + // CLUSTERS CLUSTER STORAGE REPLICATION + // GET_ALL } } } @@ -337,20 +323,16 @@ RouteCommandManager::RoutingModeMp RouteCommandManager::getCommandRoutingMode() const mqbcmd::SetTunable& tunable = elector.setTunable(); if (tunable.choice().isAllValue()) { - routingMode = - new ClusterRoutingMode(); // CLUSTERS CLUSTER - // STATE - // ELECTOR SET_ALL + routingMode = new ClusterRoutingMode(); + // CLUSTERS CLUSTER STATE ELECTOR SET_ALL } } else if (elector.isGetTunableValue()) { const mqbcmd::GetTunable& tunable = elector.getTunable(); if (tunable.choice().isAllValue()) { - routingMode = - new ClusterRoutingMode(); // CLUSTERS CLUSTER - // STATE - // ELECTOR GET_ALL + routingMode = new ClusterRoutingMode(); + // CLUSTERS CLUSTER STATE ELECTOR GET_ALL } } } @@ -358,7 +340,7 @@ RouteCommandManager::RoutingModeMp RouteCommandManager::getCommandRoutingMode() } } - bslma::ManagedPtr ret(routingMode); + RoutingModeMp ret(routingMode); return ret; } diff --git a/src/groups/mqb/mqba/mqba_commandrouter.h b/src/groups/mqb/mqba/mqba_commandrouter.h index f8795eb9d1..3b28d64fb8 100644 --- a/src/groups/mqb/mqba/mqba_commandrouter.h +++ b/src/groups/mqb/mqba/mqba_commandrouter.h @@ -1,4 +1,4 @@ -// Copyright 2017-2023 Bloomberg Finance L.P. +// Copyright 2024 Bloomberg Finance L.P. // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -23,8 +23,6 @@ //@CLASSES: // mqbcmd::CommandRouter: Manages routing admin commands // -//@DESCRIPTION: -// // BSL #include @@ -56,7 +54,7 @@ class MultiRequestManagerRequestContext; namespace mqba { -class RouteCommandManager { +class CommandRouter { public: typedef bsl::shared_ptr< mqbnet::MultiRequestManagerRequestContextmakeError().message() = os.str(); - return -1; // RETURN - } + if (0 != locateOrCreateDomain(&domainSp, name)) { + mwcu::MemOutStream os; + os << "Domain '" << name << "' doesn't exist"; + result->makeError().message() = os.str(); + return -1; // RETURN } DecodeAndUpsertValue configureResult; diff --git a/src/groups/mqb/mqba/mqba_domainmanager.h b/src/groups/mqb/mqba/mqba_domainmanager.h index 1bb66b4aaa..7dcb52eeff 100644 --- a/src/groups/mqb/mqba/mqba_domainmanager.h +++ b/src/groups/mqb/mqba/mqba_domainmanager.h @@ -332,6 +332,13 @@ class DomainManager BSLS_CPP11_FINAL : public mqbi::DomainFactory { /// specified `errorDescription` otherwise. int locateDomain(DomainSp* domain, const bsl::string& domainName); + /// Load into the specified `domainSp` the domain corresponding to the + /// specified `domainName`, if found. If not found then attempt to create + /// the domain corresponding to `domainName` and load the result into the + /// specified `domainSp`. Return 0 on sucess, or a non-zero return code + /// on failure. + int locateOrCreateDomain(DomainSp* domain, const bsl::string& domainName); + /// Process the specified `command` and load the result of the command /// in the specified `result`. Return zero on success or a nonzero /// value otherwise. diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index 99b95c5fd6..61cdf0e02a 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -538,7 +538,7 @@ void Cluster::processCommandDispatched(mqbcmd::ClusterResult* result, return; // RETURN } else if (command.isForceGcQueuesValue()) { - d_clusterOrchestrator.queueHelper().gcExpiredQueues(true, result); + d_clusterOrchestrator.queueHelper().gcExpiredQueues(result, true); // 'true' implies immediate if (!result->isErrorValue()) { result->makeSuccess(); @@ -3735,19 +3735,23 @@ void Cluster::getPrimaryNodes(bsl::vector* outNodes, pit != partitionsInfo.end(); pit++) { if (pit->primaryStatus() != + // TODO: Handle this case (will want to buffer) bmqp_ctrlmsg::PrimaryStatus::Value::E_ACTIVE) { - BALL_LOG_INFO << "While collecting primary nodes: Partition id " - << pit->partitionId() << " is not active"; + BALL_LOG_WARN << "While collecting primary nodes: Partition id " + << "primary for partition " << pit->partitionId() + << " is not active"; continue; } mqbnet::ClusterNode* primary = pit->primaryNode(); if (primary) { + // Don't add duplicate if (bsl::find(outNodes->begin(), outNodes->end(), primary) != outNodes->end()) { continue; } + // Check for self if (d_state.isSelfActivePrimary(pit->partitionId())) { *outIsSelfPrimary = true; continue; @@ -3757,9 +3761,9 @@ void Cluster::getPrimaryNodes(bsl::vector* outNodes, else { // TODO: handle this case // Approach may include putting into some buffer to callback later? - BALL_LOG_ERROR << "Error while collecting primary nodes: No " - "primary found for partition id " - << pit->partitionId(); + BALL_LOG_WARN << "Error while collecting primary nodes: No " + "primary found for partition id " + << pit->partitionId(); } } } diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.h b/src/groups/mqb/mqbblp/mqbblp_cluster.h index 90ef7a2da9..3ef6d80b2a 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.h +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.h @@ -629,8 +629,9 @@ class Cluster : public mqbi::Cluster, /// used by this cluster. RequestManagerType& requestManager() BSLS_KEYWORD_OVERRIDE; - mqbi::Cluster::MultiRequestManagerType& - multiRequestManager() BSLS_KEYWORD_OVERRIDE; + /// Return a reference offering modifiable access to the multi request + /// manager used by this cluster. + MultiRequestManagerType& multiRequestManager() BSLS_KEYWORD_OVERRIDE; /// Load the cluster state to the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; @@ -746,7 +747,7 @@ class Cluster : public mqbi::Cluster, /// Gets the node which is the primary for the given partitionId or sets /// outIsSelfPrimary to true if the caller is the primary. - void getPartitionPrimaryNode(mqbnet::ClusterNode** outNodes, + void getPartitionPrimaryNode(mqbnet::ClusterNode** outNode, bool* outIsSelfPrimary, int partitionId) const BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h index 238dd8bb23..eb905c87cf 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h @@ -358,8 +358,7 @@ class ClusterProxy : public mqbc::ClusterStateObserver, // Return a reference offering modifiable access to the multi request // manager used by this cluster. - mqbi::Cluster::MultiRequestManagerType& - multiRequestManager() BSLS_KEYWORD_OVERRIDE; + MultiRequestManagerType& multiRequestManager() BSLS_KEYWORD_OVERRIDE; /// Send the specified `request` with the specified `timeout` to the /// specified `target` node. If `target` is 0, it is the Cluster's @@ -536,10 +535,14 @@ class ClusterProxy : public mqbc::ClusterStateObserver, /// Load the cluster state in the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; + /// Gets all the nodes which are a primary for some partition of this + /// cluster void getPrimaryNodes(bsl::vector* outNodes, bool* outIsSelfPrimary) const BSLS_KEYWORD_OVERRIDE; - void getPartitionPrimaryNode(mqbnet::ClusterNode** outNodes, + /// Gets the node which is the primary for the given partitionId or sets + /// outIsSelfPrimary to true if the caller is the primary. + void getPartitionPrimaryNode(mqbnet::ClusterNode** outNode, bool* outIsSelfPrimary, int partitionId) const BSLS_KEYWORD_OVERRIDE; @@ -716,7 +719,7 @@ ClusterProxy::getPrimaryNodes(bsl::vector* outNodes, // no implementation } -inline void +inline void ClusterProxy::getPartitionPrimaryNode(mqbnet::ClusterNode** outNode, bool* outIsSelfPrimary, int partitionId) const diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index fae0092a3d..4b3c8c3617 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -5861,8 +5861,8 @@ void ClusterQueueHelper::onCloseQueueResponse( << contextSp->d_peer->nodeDescription(); } -void ClusterQueueHelper::gcExpiredQueues(bool immediate, - mqbcmd::ClusterResult* result) +void ClusterQueueHelper::gcExpiredQueues(mqbcmd::ClusterResult* result, + bool immediate) { // executed by the cluster *DISPATCHER* thread diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h index 19d1398d7a..a5c132ccb8 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h @@ -1088,8 +1088,8 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, /// deletion in future. /// Optionally specify a command result object to populate if there is an /// error. - void gcExpiredQueues(bool immediate = false, - mqbcmd::ClusterResult* result = nullptr); + void gcExpiredQueues(mqbcmd::ClusterResult* result = nullptr, + bool immediate = false); ClusterQueueHelper& setOnQueueAssignedCb(const OnQueueAssignedCb& value); diff --git a/src/groups/mqb/mqbi/mqbi_cluster.h b/src/groups/mqb/mqbi/mqbi_cluster.h index a610437e69..c3182aad36 100644 --- a/src/groups/mqb/mqbi/mqbi_cluster.h +++ b/src/groups/mqb/mqbi/mqbi_cluster.h @@ -436,7 +436,7 @@ class Cluster : public DispatcherClient { /// Gets the node which is the primary for the given partitionId or sets /// outIsSelfPrimary to true if the caller is the primary. - virtual void getPartitionPrimaryNode(mqbnet::ClusterNode** outNodes, + virtual void getPartitionPrimaryNode(mqbnet::ClusterNode** outNode, bool* outIsSelfPrimary, int partitionId) const = 0; }; diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.h b/src/groups/mqb/mqbmock/mqbmock_cluster.h index 527f6fd48a..723e44096f 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.h +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.h @@ -108,7 +108,8 @@ namespace mqbmock { class Cluster : public mqbi::Cluster { private: // PRIVATE TYPES - typedef Cluster::RequestManagerType RequestManagerType; + typedef Cluster::RequestManagerType RequestManagerType; + typedef Cluster::MultiRequestManagerType MultiRequestManagerType; typedef bsl::function EventProcessor; @@ -129,8 +130,6 @@ class Cluster : public mqbi::Cluster { bdlcc::ObjectPoolFunctors::RemoveAll > BlobSpPool; - typedef mqbc::ClusterData::MultiRequestManagerType MultiRequestManagerType; - public: // TYPES typedef bsl::vector ClusterNodeDefs; @@ -315,10 +314,9 @@ class Cluster : public mqbi::Cluster { /// used by this cluster. RequestManagerType& requestManager() BSLS_KEYWORD_OVERRIDE; - // Return a reference offering modifiable access to the multi request - // manager used by this cluster. - mqbi::Cluster::MultiRequestManagerType& - multiRequestManager() BSLS_KEYWORD_OVERRIDE; + /// Return a reference offering modifiable access to the multi request + /// manager used by this cluster. + MultiRequestManagerType& multiRequestManager() BSLS_KEYWORD_OVERRIDE; /// Send the specified `request` with the specified `timeout` to the /// specified `target` node. If `target` is 0, it is the Cluster's @@ -420,9 +418,7 @@ class Cluster : public mqbi::Cluster { mqbc::ClusterData* _clusterData(); /// Get a modifiable reference to this object's cluster state. - mqbc::ClusterState& - _state(); // <--- WHY DOES THIS EXIST? It seems there are no references to - // it, so why have this? + mqbc::ClusterState& _state(); /// Move the test timer forward the specified `seconds`. void advanceTime(int seconds); @@ -430,10 +426,14 @@ class Cluster : public mqbi::Cluster { /// Block until scheduler executes all the scheduled callbacks. void waitForScheduler(); + /// Gets all the nodes which are a primary for some partition of this + /// cluster void getPrimaryNodes(bsl::vector* outNodes, bool* outIsSelfPrimary) const BSLS_KEYWORD_OVERRIDE; - void getPartitionPrimaryNode(mqbnet::ClusterNode** outNodes, + /// Gets all the nodes which are a primary for some partition of this + /// cluster + void getPartitionPrimaryNode(mqbnet::ClusterNode** outNode, bool* outIsSelfPrimary, int partitionId) const BSLS_KEYWORD_OVERRIDE;