Skip to content

Commit

Permalink
Attempt to load domain when not found when getting relevant cluster f…
Browse files Browse the repository at this point in the history
…or route

Signed-off-by: Luke DiGiovanna <[email protected]>
  • Loading branch information
lukedigiovanna committed Jul 18, 2024
1 parent d00d811 commit bc45c98
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 131 deletions.
7 changes: 4 additions & 3 deletions src/groups/mqb/mqba/mqba_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,11 +535,12 @@ Application::getRelevantCluster(const mqbcmd::CommandChoice& command,

// attempt to locate the domain
bsl::shared_ptr<mqbi::Domain> domainSp;
if (d_domainManager_mp->locateDomain(&domainSp, domainName) != 0) {
if (0 !=
d_domainManager_mp->locateOrCreateDomain(&domainSp, domainName)) {
mwcu::MemOutStream os;
os << "Domain '" << domainName << "' doesn't exist";
cmdResult->makeError().message() = os.str();
return nullptr;
return nullptr; // RETURN
}

return domainSp->cluster();
Expand Down Expand Up @@ -751,7 +752,7 @@ int Application::processCommand(const bslstl::StringRef& source,
}

// otherwise, this is an original call. utilize router if necessary
mqba::RouteCommandManager routeCommandManager(cmd, commandWithOptions);
mqba::CommandRouter routeCommandManager(cmd, commandWithOptions);

bool shouldSelfExecute = true;
bsl::string selfName;
Expand Down
114 changes: 48 additions & 66 deletions src/groups/mqb/mqba/mqba_commandrouter.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2014-2023 Bloomberg Finance L.P.
// Copyright 2024 Bloomberg Finance L.P.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -34,9 +34,8 @@ namespace {
const char k_LOG_CATEGORY[] = "MQBA.COMMANDROUTER";
} // close unnamed namespace

RouteCommandManager::RouteCommandManager(
const bsl::string& commandString,
const mqbcmd::Command& commandWithOptions)
CommandRouter::CommandRouter(const bsl::string& commandString,
const mqbcmd::Command& commandWithOptions)
: d_commandString(commandString)
, d_commandWithOptions(commandWithOptions)
, d_command(commandWithOptions.choice())
Expand All @@ -49,21 +48,21 @@ RouteCommandManager::RouteCommandManager(
}
}

RouteCommandManager::RoutingMode::RoutingMode()
CommandRouter::RoutingMode::RoutingMode()
{
}

RouteCommandManager::RoutingMode::~RoutingMode()
CommandRouter::RoutingMode::~RoutingMode()
{
}

RouteCommandManager::AllPartitionPrimariesRoutingMode::
CommandRouter::AllPartitionPrimariesRoutingMode::
AllPartitionPrimariesRoutingMode()
{
}

RouteCommandManager::RouteMembers
RouteCommandManager::AllPartitionPrimariesRoutingMode::getRouteMembers(
CommandRouter::RouteMembers
CommandRouter::AllPartitionPrimariesRoutingMode::getRouteMembers(
mqbi::Cluster* cluster)
{
NodesVector primaryNodes;
Expand All @@ -81,14 +80,14 @@ RouteCommandManager::AllPartitionPrimariesRoutingMode::getRouteMembers(
return {primaryNodes, isSelfPrimary};
}

RouteCommandManager::SinglePartitionPrimaryRoutingMode::
CommandRouter::SinglePartitionPrimaryRoutingMode::
SinglePartitionPrimaryRoutingMode(int partitionId)
: d_partitionId(partitionId)
{
}

RouteCommandManager::RouteMembers
RouteCommandManager::SinglePartitionPrimaryRoutingMode::getRouteMembers(
CommandRouter::RouteMembers
CommandRouter::SinglePartitionPrimaryRoutingMode::getRouteMembers(
mqbi::Cluster* cluster)
{
mqbnet::ClusterNode* node = nullptr;
Expand All @@ -113,13 +112,12 @@ RouteCommandManager::SinglePartitionPrimaryRoutingMode::getRouteMembers(
return {nodes, isSelfPrimary};
}

RouteCommandManager::ClusterRoutingMode::ClusterRoutingMode()
CommandRouter::ClusterRoutingMode::ClusterRoutingMode()
{
}

RouteCommandManager::RouteMembers
RouteCommandManager::ClusterRoutingMode::getRouteMembers(
mqbi::Cluster* cluster)
CommandRouter::RouteMembers
CommandRouter::ClusterRoutingMode::getRouteMembers(mqbi::Cluster* cluster)
{
typedef mqbnet::Cluster::NodesList NodesList;
// collect all nodes in cluster
Expand All @@ -141,7 +139,7 @@ RouteCommandManager::ClusterRoutingMode::getRouteMembers(
};
}

bool RouteCommandManager::route(mqbi::Cluster* relevantCluster)
bool CommandRouter::route(mqbi::Cluster* relevantCluster)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_routingMode);
Expand All @@ -155,9 +153,9 @@ bool RouteCommandManager::route(mqbi::Cluster* relevantCluster)
RouteMembers routeMembers = d_routingMode->getRouteMembers(
relevantCluster);

if (routeMembers.nodes.size() == 0) {
if (routeMembers.d_nodes.size() == 0) {
countDownLatch();
return routeMembers.self;
return routeMembers.d_self;
}

RequestContextSp contextSp =
Expand All @@ -169,15 +167,15 @@ bool RouteCommandManager::route(mqbi::Cluster* relevantCluster)
adminCommand.command() = d_commandString;
adminCommand.rerouted() = true;

contextSp->setDestinationNodes(routeMembers.nodes);
contextSp->setDestinationNodes(routeMembers.d_nodes);

mwcu::MemOutStream os;
os << "Routing command to the following nodes [";
for (NodesVector::const_iterator nit = routeMembers.nodes.begin();
nit != routeMembers.nodes.end();
for (NodesVector::const_iterator nit = routeMembers.d_nodes.begin();
nit != routeMembers.d_nodes.end();
nit++) {
os << (*nit)->hostName();
if (nit + 1 != routeMembers.nodes.end()) {
if (nit + 1 != routeMembers.d_nodes.end()) {
os << ", ";
}
}
Expand All @@ -186,17 +184,17 @@ bool RouteCommandManager::route(mqbi::Cluster* relevantCluster)
BALL_LOG_INFO << os.str();

contextSp->setResponseCb(
bdlf::BindUtil::bind(&RouteCommandManager::onRouteCommandResponse,
bdlf::BindUtil::bind(&CommandRouter::onRouteCommandResponse,
this,
bdlf::PlaceHolders::_1));

relevantCluster->multiRequestManager().sendRequest(contextSp,
bsls::TimeInterval(3));

return routeMembers.self;
return routeMembers.d_self;
}

void RouteCommandManager::onRouteCommandResponse(
void CommandRouter::onRouteCommandResponse(
const MultiRequestContextSp& requestContext)
{
typedef bsl::pair<mqbnet::ClusterNode*, bmqp_ctrlmsg::ControlMessage>
Expand Down Expand Up @@ -249,33 +247,27 @@ void RouteCommandManager::onRouteCommandResponse(
countDownLatch();
}

RouteCommandManager::RoutingModeMp RouteCommandManager::getCommandRoutingMode()
CommandRouter::RoutingModeMp CommandRouter::getCommandRoutingMode()
{
RouteCommandManager::RoutingMode* routingMode = nullptr;
RoutingMode* routingMode = nullptr;
if (d_command.isDomainsValue()) {
const mqbcmd::DomainsCommand& domains = d_command.domains();
if (domains.isDomainValue()) {
const mqbcmd::DomainCommand& domain = domains.domain().command();
if (domain.isPurgeValue()) {
routingMode =
new AllPartitionPrimariesRoutingMode(); // DOMAINS DOMAIN
// <name> PURGE
routingMode = new AllPartitionPrimariesRoutingMode();
// DOMAINS DOMAIN <name> PURGE
}
else if (domain.isQueueValue()) {
if (domain.queue().command().isPurgeAppIdValue()) {
routingMode =
new AllPartitionPrimariesRoutingMode(); // DOMAINS
// DOMAIN
// <name>
// QUEUE
// <name>
// PURGE
routingMode = new AllPartitionPrimariesRoutingMode();
// DOMAINS DOMAIN <name> QUEUE <name> PURGE
}
}
}
else if (domains.isReconfigureValue()) {
routingMode =
new ClusterRoutingMode(); // DOMAINS RECONFIGURE <domain>
routingMode = new ClusterRoutingMode();
// DOMAINS RECONFIGURE <domain>
}
}
else if (d_command.isClustersValue()) {
Expand All @@ -284,10 +276,8 @@ RouteCommandManager::RoutingModeMp RouteCommandManager::getCommandRoutingMode()
const mqbcmd::ClusterCommand& cluster =
clusters.cluster().command();
if (cluster.isForceGcQueuesValue()) {
routingMode =
new AllPartitionPrimariesRoutingMode(); // CLUSTERS
// CLUSTER <name>
// FORCE_GC_QUEUES
routingMode = new AllPartitionPrimariesRoutingMode();
// CLUSTERS CLUSTER <name> FORCE_GC_QUEUES
}
else if (cluster.isStorageValue()) {
const mqbcmd::StorageCommand& storage = cluster.storage();
Expand All @@ -296,9 +286,9 @@ RouteCommandManager::RoutingModeMp RouteCommandManager::getCommandRoutingMode()
storage.partition().command().isDisableValue()) {
int partitionId = storage.partition().partitionId();
routingMode = new SinglePartitionPrimaryRoutingMode(
partitionId); // CLUSTERS CLUSTER <name> STORAGE
// PARTITION <partitionId>
// [ENABLE|DISABLE]
partitionId);
// CLUSTERS CLUSTER <name> STORAGE PARTITION
// <partitionId> [ENABLE|DISABLE]
}
// SUMMARY doesn't need to route to primary
}
Expand All @@ -309,22 +299,18 @@ RouteCommandManager::RoutingModeMp RouteCommandManager::getCommandRoutingMode()
const mqbcmd::SetTunable& tunable =
replication.setTunable();
if (tunable.choice().isAllValue()) {
routingMode =
new ClusterRoutingMode(); // CLUSTERS CLUSTER
// <name> STORAGE
// REPLICATION
// SET_ALL
routingMode = new ClusterRoutingMode();
// CLUSTERS CLUSTER <name> STORAGE REPLICATION
// SET_ALL
}
}
else if (replication.isGetTunableValue()) {
const mqbcmd::GetTunable& tunable =
replication.getTunable();
if (tunable.choice().isAllValue()) {
routingMode =
new ClusterRoutingMode(); // CLUSTERS CLUSTER
// <name> STORAGE
// REPLICATION
// GET_ALL
routingMode = new ClusterRoutingMode();
// CLUSTERS CLUSTER <name> STORAGE REPLICATION
// GET_ALL
}
}
}
Expand All @@ -337,28 +323,24 @@ RouteCommandManager::RoutingModeMp RouteCommandManager::getCommandRoutingMode()
const mqbcmd::SetTunable& tunable =
elector.setTunable();
if (tunable.choice().isAllValue()) {
routingMode =
new ClusterRoutingMode(); // CLUSTERS CLUSTER
// <name> STATE
// ELECTOR SET_ALL
routingMode = new ClusterRoutingMode();
// CLUSTERS CLUSTER <name> STATE ELECTOR SET_ALL
}
}
else if (elector.isGetTunableValue()) {
const mqbcmd::GetTunable& tunable =
elector.getTunable();
if (tunable.choice().isAllValue()) {
routingMode =
new ClusterRoutingMode(); // CLUSTERS CLUSTER
// <name> STATE
// ELECTOR GET_ALL
routingMode = new ClusterRoutingMode();
// CLUSTERS CLUSTER <name> STATE ELECTOR GET_ALL
}
}
}
}
}
}

bslma::ManagedPtr<RoutingMode> ret(routingMode);
RoutingModeMp ret(routingMode);

return ret;
}
Expand Down
22 changes: 10 additions & 12 deletions src/groups/mqb/mqba/mqba_commandrouter.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017-2023 Bloomberg Finance L.P.
// Copyright 2024 Bloomberg Finance L.P.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -23,8 +23,6 @@
//@CLASSES:
// mqbcmd::CommandRouter: Manages routing admin commands
//
//@DESCRIPTION:
//

// BSL
#include <bsl_iostream.h>
Expand Down Expand Up @@ -56,7 +54,7 @@ class MultiRequestManagerRequestContext;

namespace mqba {

class RouteCommandManager {
class CommandRouter {
public:
typedef bsl::shared_ptr<
mqbnet::MultiRequestManagerRequestContext<bmqp_ctrlmsg::ControlMessage,
Expand All @@ -67,8 +65,8 @@ class RouteCommandManager {

private:
struct RouteMembers {
NodesVector nodes;
bool self;
NodesVector d_nodes;
bool d_self;
};

class RoutingMode {
Expand Down Expand Up @@ -120,8 +118,8 @@ class RouteCommandManager {
public:
/// Sets up a command router with the given command string and parsed
/// command object. This will
RouteCommandManager(const bsl::string& commandString,
const mqbcmd::Command& commandWithOptions);
CommandRouter(const bsl::string& commandString,
const mqbcmd::Command& commandWithOptions);

/// Returns true if this command router is necessary to route the command
/// that it was set up with. If the command does not require routing, then
Expand Down Expand Up @@ -153,22 +151,22 @@ class RouteCommandManager {
void routeCommand(const NodesVector& nodes);
};

inline bool RouteCommandManager::isRoutingNeeded() const
inline bool CommandRouter::isRoutingNeeded() const
{
return d_routingMode.get() != nullptr;
}

inline mqbcmd::RouteResponseList& RouteCommandManager::responses()
inline mqbcmd::RouteResponseList& CommandRouter::responses()
{
return d_responses;
}

inline void RouteCommandManager::waitForResponses()
inline void CommandRouter::waitForResponses()
{
d_latch.wait();
}

inline void RouteCommandManager::countDownLatch()
inline void CommandRouter::countDownLatch()
{
d_latch.countDown(1);
}
Expand Down
Loading

0 comments on commit bc45c98

Please sign in to comment.