Skip to content

Commit

Permalink
Fix[MQB]: Enable strong consistency CSL and fix leader activeness (#495)
Browse files Browse the repository at this point in the history
* mqbc::ElectorInfo: Prohibit e_UNDEFINED -> e_ACTIVE, and fix observers

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbc::IncoreCSL: Enable strong consistency

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* Apply clang format

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbc::IncoreCSL.t: Use bmqtst::TestHelperUtil::allocator

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

---------

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>
  • Loading branch information
kaikulimu authored Dec 11, 2024
1 parent acf74d3 commit 0d28d61
Show file tree
Hide file tree
Showing 21 changed files with 524 additions and 584 deletions.
11 changes: 7 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3549,10 +3549,13 @@ void Cluster::onClusterLeader(mqbnet::ClusterNode* node,
}

d_clusterOrchestrator.updateDatumStats();
d_clusterData.stats().setIsLeader(
d_clusterData.membership().selfNode() == node
? mqbstat::ClusterStats::LeaderStatus::e_LEADER
: mqbstat::ClusterStats::LeaderStatus::e_FOLLOWER);

if (status == mqbc::ElectorInfoLeaderStatus::e_ACTIVE) {
d_clusterData.stats().setIsLeader(
d_clusterData.membership().selfNode() == node
? mqbstat::ClusterStats::LeaderStatus::e_LEADER
: mqbstat::ClusterStats::LeaderStatus::e_FOLLOWER);
}
}

void Cluster::onLeaderPassiveThreshold()
Expand Down
8 changes: 2 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,9 +587,7 @@ ClusterOrchestrator::ClusterOrchestrator(
mqbc::IncoreClusterStateLedger>(
d_allocators.get("ClusterStateLedger"),
clusterConfig,
mqbc::ClusterStateLedgerConsistency::e_EVENTUAL,
// TODO Add cluster config to determine Eventual vs
// Strong
mqbc::ClusterStateLedgerConsistency::e_STRONG,
d_clusterData_p,
clusterState,
&d_clusterData_p->blobSpPool())),
Expand All @@ -606,9 +604,7 @@ ClusterOrchestrator::ClusterOrchestrator(
mqbc::IncoreClusterStateLedger>(
d_allocators.get("ClusterStateLedger"),
clusterConfig,
mqbc::ClusterStateLedgerConsistency::e_EVENTUAL,
// TODO Add cluster config to determine Eventual vs
// Strong
mqbc::ClusterStateLedgerConsistency::e_STRONG,
d_clusterData_p,
clusterState,
&d_clusterData_p->blobSpPool())),
Expand Down
4 changes: 4 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ void ClusterProxy::onActiveNodeUp(mqbnet::ClusterNode* activeNode)
mqbnet::ElectorState::e_LEADER,
d_clusterData.electorInfo().electorTerm() + 1,
activeNode,
mqbc::ElectorInfoLeaderStatus::e_PASSIVE);
// It is **prohibited** to set leader status directly from e_UNDEFINED
// to e_ACTIVE. Hence, we do: e_UNDEFINED -> e_PASSIVE -> e_ACTIVE
d_clusterData.electorInfo().setLeaderStatus(
mqbc::ElectorInfoLeaderStatus::e_ACTIVE);
}

Expand Down
4 changes: 3 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3911,7 +3911,9 @@ void ClusterQueueHelper::onClusterLeader(
<< (node ? node->nodeDescription() : "** none **")
<< ", leader status: " << status;

restoreState(mqbs::DataStore::k_ANY_PARTITION_ID);
if (status == mqbc::ElectorInfoLeaderStatus::e_ACTIVE) {
restoreState(mqbs::DataStore::k_ANY_PARTITION_ID);
}

if (d_cluster_p->isRemote()) {
// non-proxy (replica) case is handled by
Expand Down
35 changes: 1 addition & 34 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,6 @@ void ClusterStateManager::processPartitionPrimaryAdvisoryRaw(
}

d_isFirstLeaderAdvisory = false;
d_clusterStateLedger_mp->setIsFirstLeaderAdvisory(d_isFirstLeaderAdvisory);
}

// PRIVATE MANIPULATORS
Expand Down Expand Up @@ -979,8 +978,6 @@ void ClusterStateManager::onPartitionPrimaryAssignment(
oldLeaseId);

d_isFirstLeaderAdvisory = false;
d_clusterStateLedger_mp->setIsFirstLeaderAdvisory(
d_isFirstLeaderAdvisory);
}

d_afterPartitionPrimaryAssignmentCb(partitionId, primary, status);
Expand Down Expand Up @@ -1247,7 +1244,6 @@ void ClusterStateManager::sendClusterState(

// Self is leader and has published advisory above, so update it.
d_isFirstLeaderAdvisory = false;
d_clusterStateLedger_mp->setIsFirstLeaderAdvisory(d_isFirstLeaderAdvisory);

mqbc::ClusterUtil::sendClusterState(d_clusterData_p,
d_clusterStateLedger_mp.get(),
Expand Down Expand Up @@ -2018,35 +2014,6 @@ void ClusterStateManager::processClusterStateEvent(
bmqp::Event rawEvent(event.blob().get(), d_allocator_p);
BSLS_ASSERT_SAFE(rawEvent.isClusterStateEvent());

// NOTE: Any validation of the event would go here.
if (source != d_clusterData_p->electorInfo().leaderNode()) {
BALL_LOG_WARN << d_clusterData_p->identity().description()
<< ": ignoring cluster state event from cluster node "
<< source->nodeDescription() << " as this node is not "
<< "the current perceived leader. Current leader: ["
<< d_clusterData_p->electorInfo().leaderNodeId() << ": "
<< (d_clusterData_p->electorInfo().leaderNode()
? d_clusterData_p->electorInfo()
.leaderNode()
->nodeDescription()
: "* UNKNOWN *")
<< "]";
return; // RETURN
}
// 'source' is the perceived leader

// TBD: Suppress the following check for now, which will help some
// integration tests to pass. At this point, it is not clear if it is safe
// to process cluster state events while self is stopping.
//
// if ( bmqp_ctrlmsg::NodeStatus::E_STOPPING
// == d_clusterData_p->membership().selfNodeStatus()) {
// return; // RETURN
// }

// TODO: Validate the incoming advisory and potentially buffer it for later
// if the node is currently starting.

const int rc = d_clusterStateLedger_mp->apply(*rawEvent.blob(), source);
if (rc != 0) {
BALL_LOG_ERROR << d_clusterData_p->identity().description()
Expand Down Expand Up @@ -2187,7 +2154,7 @@ void ClusterStateManager::processLeaderAdvisory(

// Leader status and sequence number are updated unconditionally. It may
// have been updated by one of the routines called earlier in this method,
// but there is no harm is setting these values again.
// but there is no harm in setting these values again.

d_clusterData_p->electorInfo().setLeaderMessageSequence(leaderMsgSeq);
d_clusterData_p->electorInfo().setLeaderStatus(
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ class ClusterStateManager BSLS_KEYWORD_FINAL
const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE;

/// Process the specified `event`.
/// Process the specified cluster state `event`.
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
Expand Down
13 changes: 13 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemonitor.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,19 @@ struct TestHelper {
isActive ? mqbc::ElectorInfoLeaderStatus::e_ACTIVE
: mqbc::ElectorInfoLeaderStatus::e_PASSIVE;

if (d_cluster_mp->_clusterData()->electorInfo().leaderStatus() ==
mqbc::ElectorInfoLeaderStatus::e_UNDEFINED &&
status == mqbc::ElectorInfoLeaderStatus::e_ACTIVE) {
// It is **prohibited** to set leader status directly from
// e_UNDEFINED to e_ACTIVE, so we set to e_PASSIVE first then
// immediately to e_ACTIVE
d_cluster_mp->_clusterData()->electorInfo().setElectorInfo(
mqbnet::ElectorState::e_LEADER,
1,
node,
mqbc::ElectorInfoLeaderStatus::e_PASSIVE);
}

d_cluster_mp->_clusterData()->electorInfo().setElectorInfo(
mqbnet::ElectorState::e_LEADER,
1,
Expand Down
2 changes: 0 additions & 2 deletions src/groups/mqb/mqbc/mqbc_clusterstateledger.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,6 @@ class ClusterStateLedger : public ElectorInfoObserver {
virtual int apply(const bdlbb::Blob& record,
mqbnet::ClusterNode* source) = 0;

virtual void setIsFirstLeaderAdvisory(bool isFirstLeaderAdvisory) = 0;

/// Set the commit callback to the specified `value`.
virtual void setCommitCb(const CommitCb& value) = 0;

Expand Down
8 changes: 0 additions & 8 deletions src/groups/mqb/mqbc/mqbc_clusterstateledger.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,6 @@ struct ClusterStateLedgerTestImp
return markDone();
}

void
setIsFirstLeaderAdvisory(BSLS_ANNOTATION_UNUSED bool isFirstLeaderAdvisory)
BSLS_KEYWORD_OVERRIDE
{
markDone();
}

// ACCESSORS
void setCommitCb(BSLS_ANNOTATION_UNUSED const CommitCb& value)
BSLS_KEYWORD_OVERRIDE
Expand Down Expand Up @@ -205,7 +198,6 @@ static void test1_clusterStateLedger_protocol()
apply(bmqp_ctrlmsg::LeaderAdvisory()));
BSLS_PROTOCOLTEST_ASSERT(testObj,
apply(bmqp_ctrlmsg::ClusterMessage()));
BSLS_PROTOCOLTEST_ASSERT(testObj, setIsFirstLeaderAdvisory(true));
BSLS_PROTOCOLTEST_ASSERT(
testObj,
setCommitCb(mqbc::ClusterStateLedger::CommitCb()));
Expand Down
55 changes: 17 additions & 38 deletions src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1746,35 +1746,6 @@ void ClusterStateManager::processClusterStateEvent(
bmqp::Event rawEvent(event.blob().get(), d_allocator_p);
BSLS_ASSERT_SAFE(rawEvent.isClusterStateEvent());

// NOTE: Any validation of the event would go here.
if (source != d_clusterData_p->electorInfo().leaderNode()) {
BALL_LOG_WARN << d_clusterData_p->identity().description()
<< ": ignoring cluster state event from cluster node "
<< source->nodeDescription() << " as this node is not "
<< "the current perceived leader. Current leader: ["
<< d_clusterData_p->electorInfo().leaderNodeId() << ": "
<< (d_clusterData_p->electorInfo().leaderNode()
? d_clusterData_p->electorInfo()
.leaderNode()
->nodeDescription()
: "* UNKNOWN *")
<< "]";
return; // RETURN
}
// 'source' is the perceived leader

// TBD: Suppress the following check for now, which will help some
// integration tests to pass. At this point, it is not clear if it is safe
// to process cluster state events while self is stopping.
//
// if ( bmqp_ctrlmsg::NodeStatus::E_STOPPING
// == d_clusterData_p->membership().selfNodeStatus()) {
// return; // RETURN
// }

// TODO: Validate the incoming advisory and potentially buffer it for later
// if the node is currently starting.

const int rc = d_clusterStateLedger_mp->apply(*rawEvent.blob(), source);
if (rc != 0) {
BALL_LOG_ERROR << d_clusterData_p->identity().description()
Expand Down Expand Up @@ -1878,14 +1849,16 @@ void ClusterStateManager::onNodeStopped()

// MANIPULATORS
// (virtual: mqbc::ElectorInfoObserver)
void ClusterStateManager::onClusterLeader(
mqbnet::ClusterNode* node,
BSLS_ANNOTATION_UNUSED ElectorInfoLeaderStatus::Enum status)
void ClusterStateManager::onClusterLeader(mqbnet::ClusterNode* node,
ElectorInfoLeaderStatus::Enum status)
{
// executed by the cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(
(node && (ElectorInfoLeaderStatus::e_UNDEFINED != status)) ||
(!node && (ElectorInfoLeaderStatus::e_UNDEFINED == status)));

if (!node) {
// Leader lost
Expand All @@ -1899,21 +1872,27 @@ void ClusterStateManager::onClusterLeader(
return; // RETURN
}

// We can only reach this code path if leader `node` is not null. Here, we
// only care about transitioning from no leader to having a leader. We
// don't care about leader status changing between e_PASSIVE and e_ACTIVE.
if (d_clusterFSM.state() != ClusterFSM::State::e_UNKNOWN) {
return; // RETURN
}

// IMPORTANT: This is the main entry point to start running the Cluster FSM
InputMessages inputMessages(1, d_allocator_p);
inputMessages.at(0).setSource(node); // leader node

if (d_clusterData_p->membership().selfNode()->nodeId() == node->nodeId()) {
BALL_LOG_INFO << d_clusterData_p->identity().description() << ": "
<< (d_clusterFSM.isSelfLeader() ? "Re-" : "")
<< "Transitioning to leader in the Cluster FSM.";
BALL_LOG_INFO << d_clusterData_p->identity().description()
<< ": Transitioning to leader in the Cluster FSM.";

applyFSMEvent(ClusterFSM::Event::e_SLCT_LDR,
ClusterFSMEventMetadata(inputMessages));
}
else {
BALL_LOG_INFO << d_clusterData_p->identity().description() << ": "
<< (d_clusterFSM.isSelfFollower() ? "Re-" : "")
<< "Transitioning to follower in the Cluster FSM.";
BALL_LOG_INFO << d_clusterData_p->identity().description()
<< ": Transitioning to follower in the Cluster FSM.";

applyFSMEvent(ClusterFSM::Event::e_SLCT_FOL,
ClusterFSMEventMetadata(inputMessages));
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbc/mqbc_clusterstatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ class ClusterStateManager BSLS_KEYWORD_FINAL
const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE;

/// Process the specified `event`.
/// Process the specified cluster state `event`.
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
Expand Down
9 changes: 6 additions & 3 deletions src/groups/mqb/mqbc/mqbc_clusterstatemanager.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ typedef mqbc::ClusterStateManager::ClusterStateLedgerMp ClusterStateLedgerMp;
struct Tester {
public:
// PUBLIC DATA
bool d_isLeader;
bdlbb::PooledBlobBufferFactory d_bufferFactory;
bmqu::TempDirectory d_tempDir;
bslma::ManagedPtr<mqbmock::Cluster> d_cluster_mp;
Expand All @@ -96,7 +97,8 @@ struct Tester {
public:
// CREATORS
Tester(bool isLeader)
: d_bufferFactory(1024, bmqtst::TestHelperUtil::allocator())
: d_isLeader(isLeader)
, d_bufferFactory(1024, bmqtst::TestHelperUtil::allocator())
, d_tempDir(bmqtst::TestHelperUtil::allocator())
, d_cluster_mp(0)
, d_clusterStateLedger_p(0)
Expand Down Expand Up @@ -140,7 +142,7 @@ struct Tester {
mqbmock::Cluster(&d_bufferFactory,
bmqtst::TestHelperUtil::allocator(),
true, // isClusterMember
isLeader,
d_isLeader,
true, // isCSLMode
true, // isFSMWorkflow
clusterNodeDefs,
Expand Down Expand Up @@ -211,7 +213,8 @@ struct Tester {
mqbmock::Cluster::k_LEADER_NODE_ID);
BSLS_ASSERT_OPT(leaderNode != 0);
d_cluster_mp->_clusterData()->electorInfo().setElectorInfo(
mqbnet::ElectorState::e_LEADER,
d_isLeader ? mqbnet::ElectorState::e_LEADER
: mqbnet::ElectorState::e_FOLLOWER,
electorTerm,
leaderNode,
mqbc::ElectorInfoLeaderStatus::e_PASSIVE);
Expand Down
13 changes: 7 additions & 6 deletions src/groups/mqb/mqbc/mqbc_clusterutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ void applyQueueUpdate(mqbc::ClusterState* clusterState,
<< clusterData.identity().description()
<< ": Received QueueUpdateAdvisory for known queue [uri: "
<< uri << "] with a mismatched queueKey "
<< "[expected: " << queueKey << ", received: " << queueKey
<< "]: " << queueUpdate << BMQTSK_ALARMLOG_END;
<< "[expected: " << cit->second->key()
<< ", received: " << queueKey << "]: " << queueUpdate
<< BMQTSK_ALARMLOG_END;
return; // RETURN
}
}
Expand Down Expand Up @@ -1213,10 +1214,10 @@ void ClusterUtil::registerAppId(ClusterData* clusterData,

if (mqbnet::ElectorState::e_LEADER !=
clusterData->electorInfo().electorState()) {
BALL_LOG_ERROR << clusterData->identity().description()
<< ": Failed to register appId '" << appId
<< "' for domain '" << domain->name()
<< "'. Self is not leader.";
BALL_LOG_WARN << clusterData->identity().description()
<< ": Not registering appId '" << appId
<< "' for domain '" << domain->name()
<< "'. Self is not leader.";
return; // RETURN
}

Expand Down
Loading

0 comments on commit 0d28d61

Please sign in to comment.