Skip to content

Commit

Permalink
Add error message when routing fails due to inactive/no primary
Browse files Browse the repository at this point in the history
Signed-off-by: Luke DiGiovanna <[email protected]>
  • Loading branch information
lukedigiovanna committed Jul 18, 2024
1 parent bc45c98 commit 406c199
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 91 deletions.
7 changes: 6 additions & 1 deletion src/groups/mqb/mqba/mqba_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,12 @@ int Application::processCommand(const bslstl::StringRef& source,
return -2;
}
selfName = cluster->netCluster().selfNode()->hostName();
shouldSelfExecute = routeCommandManager.route(cluster);
shouldSelfExecute = routeCommandManager.route(&cmdResult, cluster);

if (cmdResult.isErrorValue()) {
printCommandResult(cmdResult, commandWithOptions.encoding(), os);
return -2;
}
}

if (shouldSelfExecute) {
Expand Down
48 changes: 34 additions & 14 deletions src/groups/mqb/mqba/mqba_commandrouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,24 @@ CommandRouter::AllPartitionPrimariesRoutingMode::

CommandRouter::RouteMembers
CommandRouter::AllPartitionPrimariesRoutingMode::getRouteMembers(
mqbi::Cluster* cluster)
mqbcmd::InternalResult* result,
mqbi::Cluster* cluster)
{
NodesVector primaryNodes;
bool isSelfPrimary;
bool success;

cluster->dispatcher()->execute(
bdlf::BindUtil::bind(&mqbi::Cluster::getPrimaryNodes,
cluster,
&primaryNodes,
&isSelfPrimary),
&isSelfPrimary,
result),
cluster);

cluster->dispatcher()->synchronize(cluster);

return {primaryNodes, isSelfPrimary};
return {.d_nodes = primaryNodes, .d_self = isSelfPrimary};
}

CommandRouter::SinglePartitionPrimaryRoutingMode::
Expand All @@ -88,16 +91,19 @@ CommandRouter::SinglePartitionPrimaryRoutingMode::

CommandRouter::RouteMembers
CommandRouter::SinglePartitionPrimaryRoutingMode::getRouteMembers(
mqbi::Cluster* cluster)
mqbcmd::InternalResult* result,
mqbi::Cluster* cluster)
{
mqbnet::ClusterNode* node = nullptr;
bool isSelfPrimary = false;
mqbnet::ClusterNode* node;
bool isSelfPrimary;
bool success;

cluster->dispatcher()->execute(
bdlf::BindUtil::bind(&mqbi::Cluster::getPartitionPrimaryNode,
cluster,
&node,
&isSelfPrimary,
result,
d_partitionId),
cluster);

Expand All @@ -109,15 +115,19 @@ CommandRouter::SinglePartitionPrimaryRoutingMode::getRouteMembers(
nodes.push_back(node);
}

return {nodes, isSelfPrimary};
return {
.d_nodes = nodes,
.d_self = isSelfPrimary,
};
}

CommandRouter::ClusterRoutingMode::ClusterRoutingMode()
{
}

CommandRouter::RouteMembers
CommandRouter::ClusterRoutingMode::getRouteMembers(mqbi::Cluster* cluster)
CommandRouter::RouteMembers CommandRouter::ClusterRoutingMode::getRouteMembers(
mqbcmd::InternalResult* result,
mqbi::Cluster* cluster)
{
typedef mqbnet::Cluster::NodesList NodesList;
// collect all nodes in cluster
Expand All @@ -134,24 +144,34 @@ CommandRouter::ClusterRoutingMode::getRouteMembers(mqbi::Cluster* cluster)
}

return {
nodes,
true // Cluster routing always requires original node to exec.
.d_nodes = nodes,
.d_self =
true, // Cluster routing always requires original node to exec.
};
}

bool CommandRouter::route(mqbi::Cluster* relevantCluster)
bool CommandRouter::route(mqbcmd::InternalResult* result,
mqbi::Cluster* relevantCluster)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_routingMode);
BSLS_ASSERT_SAFE(result);
BSLS_ASSERT_SAFE(relevantCluster);

typedef mqbnet::MultiRequestManager<bmqp_ctrlmsg::ControlMessage,
bmqp_ctrlmsg::ControlMessage,
mqbnet::ClusterNode*>::RequestContextSp
RequestContextSp;

RouteMembers routeMembers = d_routingMode->getRouteMembers(
relevantCluster);
RouteMembers routeMembers =
d_routingMode->getRouteMembers(result, relevantCluster);

// Failed to get route members, most likely due to not having an active
// primary.
if (result->isErrorValue()) {
countDownLatch();
return false;
}

if (routeMembers.d_nodes.size() == 0) {
countDownLatch();
Expand Down
14 changes: 9 additions & 5 deletions src/groups/mqb/mqba/mqba_commandrouter.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,16 @@ class CommandRouter {
RoutingMode();
virtual ~RoutingMode() = 0;

virtual RouteMembers getRouteMembers(mqbi::Cluster* cluster) = 0;
virtual RouteMembers getRouteMembers(mqbcmd::InternalResult* result,
mqbi::Cluster* cluster) = 0;
};
class AllPartitionPrimariesRoutingMode : public RoutingMode {
public:
AllPartitionPrimariesRoutingMode();

RouteMembers
getRouteMembers(mqbi::Cluster* cluster) BSLS_KEYWORD_OVERRIDE;
getRouteMembers(mqbcmd::InternalResult* result,
mqbi::Cluster* cluster) BSLS_KEYWORD_OVERRIDE;
};
class SinglePartitionPrimaryRoutingMode : public RoutingMode {
private:
Expand All @@ -91,14 +93,16 @@ class CommandRouter {
SinglePartitionPrimaryRoutingMode(int partitionId);

RouteMembers
getRouteMembers(mqbi::Cluster* cluster) BSLS_KEYWORD_OVERRIDE;
getRouteMembers(mqbcmd::InternalResult* result,
mqbi::Cluster* cluster) BSLS_KEYWORD_OVERRIDE;
};
class ClusterRoutingMode : public RoutingMode {
public:
ClusterRoutingMode();

RouteMembers
getRouteMembers(mqbi::Cluster* cluster) BSLS_KEYWORD_OVERRIDE;
getRouteMembers(mqbcmd::InternalResult* result,
mqbi::Cluster* cluster) BSLS_KEYWORD_OVERRIDE;
};

public:
Expand Down Expand Up @@ -128,7 +132,7 @@ class CommandRouter {

/// Performs any routing on the command and returns true if the caller
/// should also execute the command.
bool route(mqbi::Cluster* relevantCluster);
bool route(mqbcmd::InternalResult* result, mqbi::Cluster* relevantCluster);

/// Waits on a latch that triggers when the responses have been received.
void waitForResponses();
Expand Down
81 changes: 50 additions & 31 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3715,20 +3715,21 @@ void Cluster::processResponse(const bmqp_ctrlmsg::ControlMessage& response)
this);
}

void Cluster::getPrimaryNodes(bsl::vector<mqbnet::ClusterNode*>* outNodes,
bool* outIsSelfPrimary) const
void Cluster::getPrimaryNodes(bsl::vector<mqbnet::ClusterNode*>* nodes,
bool* isSelfPrimary,
mqbcmd::InternalResult* result) const
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(outNodes);
BSLS_ASSERT_SAFE(outIsSelfPrimary);
BSLS_ASSERT_SAFE(nodes);
BSLS_ASSERT_SAFE(isSelfPrimary);
BSLS_ASSERT_SAFE(result);
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this));

const mqbnet::Cluster::NodesList& nodes = netCluster().nodes();
const mqbc::ClusterState::PartitionsInfo& partitionsInfo =
d_state.partitions();

*outIsSelfPrimary = false;
outNodes->clear();
nodes->clear();
*isSelfPrimary = false;

for (mqbc::ClusterState::PartitionsInfo::const_iterator pit =
partitionsInfo.begin();
Expand All @@ -3737,44 +3738,51 @@ void Cluster::getPrimaryNodes(bsl::vector<mqbnet::ClusterNode*>* outNodes,
if (pit->primaryStatus() !=
// TODO: Handle this case (will want to buffer)
bmqp_ctrlmsg::PrimaryStatus::Value::E_ACTIVE) {
BALL_LOG_WARN << "While collecting primary nodes: Partition id "
BALL_LOG_WARN << "While collecting primary nodes: "
<< "primary for partition " << pit->partitionId()
<< " is not active";
continue;
result->makeError().message() =
"Primary is not active for partition id " +
bsl::to_string(pit->partitionId());
return;
}

mqbnet::ClusterNode* primary = pit->primaryNode();

if (primary) {
// Don't add duplicate
if (bsl::find(outNodes->begin(), outNodes->end(), primary) !=
outNodes->end()) {
if (bsl::find(nodes->begin(), nodes->end(), primary) !=
nodes->end()) {
continue;
}
// Check for self
if (d_state.isSelfActivePrimary(pit->partitionId())) {
*outIsSelfPrimary = true;
*isSelfPrimary = true;
continue;
}
outNodes->push_back(primary);
nodes->push_back(primary);
}
else {
// TODO: handle this case
// Approach may include putting into some buffer to callback later?
BALL_LOG_WARN << "Error while collecting primary nodes: No "
"primary found for partition id "
<< pit->partitionId();
result->makeError().message() =
"No primary found for partition id " +
bsl::to_string(pit->partitionId());
return;
}
}
}

void Cluster::getPartitionPrimaryNode(mqbnet::ClusterNode** outNode,
bool* outIsSelfPrimary,
int partitionId) const
void Cluster::getPartitionPrimaryNode(mqbnet::ClusterNode** node,
bool* isSelfPrimary,
mqbcmd::InternalResult* result,
int partitionId) const
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(outNode);
BSLS_ASSERT_SAFE(outIsSelfPrimary);
BSLS_ASSERT_SAFE(node);
BSLS_ASSERT_SAFE(isSelfPrimary);
BSLS_ASSERT_SAFE(result);
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this));

const mqbc::ClusterState::PartitionsInfo& partitionsInfo =
Expand All @@ -3785,28 +3793,39 @@ void Cluster::getPartitionPrimaryNode(mqbnet::ClusterNode** outNode,
pit != partitionsInfo.end();
pit++) {
if (pit->partitionId() == partitionId) {
mqbnet::ClusterNode* primary = pit->primaryNode();
// Self is active primary
if (d_state.isSelfActivePrimary(partitionId)) {
*outIsSelfPrimary = true;
*isSelfPrimary = true;
return; // RETURN
}
if (primary) {
*outNode = primary;

// No active primary
if (!d_state.hasActivePrimary(partitionId)) {
result->makeError().message() =
"No active primary for partition id " +
bsl::to_string(partitionId);
return; // RETURN
}
else {
// TODO: handle this case
// ...

// Partition has active primary, get it and return that
mqbnet::ClusterNode* primary = pit->primaryNode();
if (primary) {
*node = primary;
return; // RETURN
}

// No primary node
result->makeError().message() =
"No primary node for partition id " +
bsl::to_string(partitionId);
return; // RETURN
}
}

// Didn't find a corresponding partition for the given partitionId
// TODO: handle this case
// For now just execute on this node and an appropriate error response
// will be generated.
*outIsSelfPrimary = true;
// Just execute on this node and an appropriate error response will be
// generated.
*isSelfPrimary = true;
}

} // close package namespace
Expand Down
13 changes: 8 additions & 5 deletions src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -740,15 +740,18 @@ class Cluster : public mqbi::Cluster,
const mqbnet::Cluster& netCluster() const BSLS_KEYWORD_OVERRIDE;

/// Gets all the nodes which are a primary for some partition of this
/// cluster and whether or not this node is a primary. The outNodes
/// cluster and whether or not this node is a primary. The nodes
/// vector will never include the self node.
void getPrimaryNodes(bsl::vector<mqbnet::ClusterNode*>* outNodes,
bool* outIsSelfPrimary) const BSLS_KEYWORD_OVERRIDE;
void getPrimaryNodes(bsl::vector<mqbnet::ClusterNode*>* nodes,
bool* isSelfPrimary,
mqbcmd::InternalResult* result) const
BSLS_KEYWORD_OVERRIDE;

/// Gets the node which is the primary for the given partitionId or sets
/// outIsSelfPrimary to true if the caller is the primary.
void getPartitionPrimaryNode(mqbnet::ClusterNode** outNode,
bool* outIsSelfPrimary,
void getPartitionPrimaryNode(mqbnet::ClusterNode** node,
bool* isSelfPrimary,
mqbcmd::InternalResult* result,
int partitionId) const BSLS_KEYWORD_OVERRIDE;

/// Print the state of the cluster to the specified `out`.
Expand Down
31 changes: 16 additions & 15 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -535,15 +535,14 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
/// Load the cluster state in the specified `out` object.
void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE;

/// Gets all the nodes which are a primary for some partition of this
/// cluster
void getPrimaryNodes(bsl::vector<mqbnet::ClusterNode*>* outNodes,
bool* outIsSelfPrimary) const BSLS_KEYWORD_OVERRIDE;

/// Gets the node which is the primary for the given partitionId or sets
/// outIsSelfPrimary to true if the caller is the primary.
void getPartitionPrimaryNode(mqbnet::ClusterNode** outNode,
bool* outIsSelfPrimary,
void getPrimaryNodes(bsl::vector<mqbnet::ClusterNode*>* nodes,
bool* isSelfPrimary,
mqbcmd::InternalResult* result) const
BSLS_KEYWORD_OVERRIDE;

void getPartitionPrimaryNode(mqbnet::ClusterNode** node,
bool* isSelfPrimary,
mqbcmd::InternalResult* result,
int partitionId) const BSLS_KEYWORD_OVERRIDE;

// MANIPULATORS
Expand Down Expand Up @@ -713,16 +712,18 @@ inline size_t ClusterProxy::ChannelBuffer::bytes() const
// ------------------

inline void
ClusterProxy::getPrimaryNodes(bsl::vector<mqbnet::ClusterNode*>* outNodes,
bool* outIsSelfPrimary) const
ClusterProxy::getPrimaryNodes(bsl::vector<mqbnet::ClusterNode*>* nodes,
bool* isSelfPrimary,
mqbcmd::InternalResult* result) const
{
// no implementation
}

inline void
ClusterProxy::getPartitionPrimaryNode(mqbnet::ClusterNode** outNode,
bool* outIsSelfPrimary,
int partitionId) const
inline void
ClusterProxy::getPartitionPrimaryNode(mqbnet::ClusterNode** node,
bool* isSelfPrimary,
mqbcmd::InternalResult* result,
int partitionId) const
{
// no implementation
}
Expand Down
Loading

0 comments on commit 406c199

Please sign in to comment.