Skip to content

Commit

Permalink
ClusterStateQueueInfo::d_state mini-FSM
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Nov 25, 2024
2 parents bcce77b + f2eb61b commit 01135f3
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 234 deletions.
20 changes: 9 additions & 11 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,7 @@ ClusterQueueHelper::assignQueue(const QueueContextSp& queueContext)
// PRECONDITIONS
BSLS_ASSERT_SAFE(
d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(!isQueueAssigned(*(queueContext.get())) ||
((d_cluster_p->isCSLModeEnabled() &&
queueContext->d_stateQInfo_sp->pendingUnassignment())));
BSLS_ASSERT_SAFE(!isQueueAssigned(*queueContext));

if (d_cluster_p->isRemote()) {
// Assigning a queue in a remote, is simply giving it a new queueId.
Expand Down Expand Up @@ -4102,7 +4100,11 @@ void ClusterQueueHelper::onQueueUnassigned(

const QueueContextSp& queueContextSp = queueContextIt->second;
QueueLiveState& qinfo = queueContextSp->d_liveQInfo;
if (!isQueueAssigned(*queueContextSp)) {

mqbc::ClusterStateQueueInfo* assigned =
d_clusterState_p->getAssignedOrUnassigning(queueContextSp->uri());

if (assigned == 0) {
// Queue is known but not assigned. Error because it should not occur.
// Note that it may occur if self node is starting, received an
// open-queue request for this queue (and thus, populated 'd_queues'
Expand Down Expand Up @@ -4543,10 +4545,9 @@ void ClusterQueueHelper::openQueue(
// advisory are received). So to be safe, we explicitly attempt to
// assign the queue, which is a no-op in case there is no leader.

if (!isQueueAssigned(*(queueContextIt->second)) ||
isQueuePendingUnassignment(*(queueContextIt->second))) {
if (!isQueueAssigned(*(queueContextIt->second))) {
// In CSL, unassignment is async.
// Since QueueUnassignemntAdvisory can contain multipe queues,
// Since QueueUnassignmentAdvisory can contain multiple queues,
// canceling pending Advisory is not an option.
// Instead, initiate new QueueAssignemntAdvisory which must
// take effect after old QueueUnassignemntAdvisory.
Expand Down Expand Up @@ -6145,10 +6146,7 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate)
<< "], queueKey [" << keyCopy << "] assigned to "
<< "Partition [" << pid << "] as it has expired.";

mqbc::ClusterUtil::setPendingUnassignment(d_clusterState_p,
uriCopy,
true);

mqbc::ClusterUtil::setPendingUnassignment(d_clusterState_p, uriCopy);
// Populate 'queueUnassignedAdvisory'
bdlma::LocalSequentialAllocator<1024> localAlloc(d_allocator_p);
bmqp_ctrlmsg::ControlMessage controlMsg(&localAlloc);
Expand Down
26 changes: 5 additions & 21 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -909,10 +909,6 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
/// assigned.
bool isQueueAssigned(const QueueContext& queueContext) const;

/// Return true if the queue in the specified `queueContext` is
/// pending unassignment (async in CSL).
bool isQueuePendingUnassignment(const QueueContext& queueContext) const;

/// Return true if the queue in the specified `queueContext` is assigned
/// and its associated primary is AVAILABLE and is different from the
/// optionally specified `otherThan`.
Expand Down Expand Up @@ -1239,31 +1235,19 @@ ClusterQueueHelper::isQueueAssigned(const QueueContext& queueContext) const
bmqp::QueueId::k_UNASSIGNED_QUEUE_ID; // RETURN
}

DomainStatesCIter domCit = d_clusterState_p->domainStates().find(
queueContext.uri().qualifiedDomain());
if (domCit == d_clusterState_p->domainStates().cend()) {
return false; // RETURN
}

UriToQueueInfoMapCIter qCit = domCit->second->queuesInfo().find(
mqbc::ClusterStateQueueInfo* assigned = d_clusterState_p->getAssigned(
queueContext.uri());
if (qCit == domCit->second->queuesInfo().cend()) {

if (assigned == 0) {
return false; // RETURN
}

BSLS_ASSERT_SAFE(qCit->second->partitionId() !=
BSLS_ASSERT_SAFE(assigned->partitionId() !=
mqbs::DataStore::k_INVALID_PARTITION_ID &&
!qCit->second->key().isNull());
!assigned->key().isNull());
return true;
}

inline bool ClusterQueueHelper::isQueuePendingUnassignment(
const QueueContext& queueContext) const
{
const ClusterStateQueueInfoCSp& state = queueContext.d_stateQInfo_sp;
return state ? state->pendingUnassignment() : false;
}

inline bool ClusterQueueHelper::isQueuePrimaryAvailable(
const QueueContext& queueContext,
mqbnet::ClusterNode* otherThan) const
Expand Down
209 changes: 95 additions & 114 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1601,109 +1601,99 @@ void ClusterStateManager::processQueueAssignmentAdvisory(
mqbu::StorageKey::BinaryRepresentation(),
queueInfo.key().data());

bool queueAlreadyAssigned = false;
const DomainStatesCIter domCit = d_state_p->domainStates().find(
uri.qualifiedDomain());
if (domCit != d_state_p->domainStates().cend()) {
UriToQueueInfoMapCIter qcit = domCit->second->queuesInfo().find(
uri);
if (qcit != domCit->second->queuesInfo().cend()) {
// Queue is assigned. Verify that the key and partition match
// with what we already have.
queueAlreadyAssigned = true;

if (qcit->second->partitionId() != queueInfo.partitionId() ||
(qcit->second->key() != queueKey)) {
if (!delayed) {
// Leader is telling self node to map a queue to new
// partition or have a new key (basically, its a new
// incarnation of the queue). This could occur when a
// queue is being opened-closed-opened in very quick
// succession. Old instance of the queue is deleted by
// the primary, primary broadcasts queue-unasssignment
// advisory, leader broadcasts queue-assignment
// advisory for the new instance of the queue, but self
// node receives those 2 broadcasts out of order
// (leader's advisory followed by primary's advisory).
// In this case, its beneficial to force-update self's
// view of the queue with what the leader is
// advertising (with an error). When self receives
// queue-unassignment advisory from the primary for the
// old instance of the queue, it will log an error and
// ignore it.

BALL_LOG_ERROR
<< d_cluster_p->description() << ": "
<< "received queueAssignmentAdvisory from leader '"
<< source->nodeDescription() << "' for a known and"
<< " assigned queue with different "
<< "partitionId/key: [received: " << queueInfo
<< ", knownPartitionId: "
<< qcit->second->partitionId()
<< ", knownQueueKey: " << qcit->second->key()
<< "]";
}
else {
// There is partitionId/queueKey mismatch and this is a
// delayed (aka, buffered) advisory. This is a valid
// scenario. Here's how: Node starts up, initiates
// storage sync with the primary While recovery is
// underway, a queue, which is active, is deleted and
// unassigned by the primary. Further, same queue is
// opened again, which means leader may assign it to a
// different partition, and will definitely assign it a
// different queue key, and will issue a queue
// assignment advisory. But self will buffer it. When
// recovery is complete, self's storage manager will
// apply all recovered queues (including the previous
// incarnation of this queue) to self's cluster state
// (via 'ClusterStateManager::registerQueueInfo'), and
// thus, populate 'd_queues', and this is how we will
// end up here. So instead of alarming/asserting, we
// simply log at warn, and overwrite current state with
// the buffered (this) advisory and move on.

BALL_LOG_WARN
<< d_cluster_p->description()
<< ": overwriting current known queue state "
<< "with the buffered advisory for queue ["
<< qcit->second->uri()
<< "]. Current assigned Partition ["
<< qcit->second->partitionId()
<< "], current queueKey [" << qcit->second->key()
<< "], new Partition [" << queueInfo.partitionId()
<< "], new queueKey [" << queueKey << "].";
}

// Remove existing state, mapping, etc.

d_state_p->queueKeys().erase(qcit->second->key());

// no need to update d_state_p->domainStates() entry
// , queue was already known and registered
AppInfos appIdInfos(d_allocator_p);

mqbc::ClusterUtil::parseQueueInfo(&appIdInfos,
queueInfo,
d_allocator_p);

BSLA_MAYBE_UNUSED const bool rc = d_state_p->assignQueue(
uri,
queueKey,
queueInfo.partitionId(),
appIdInfos);
BSLS_ASSERT_SAFE(rc == false);
mqbc::ClusterStateQueueInfo* assigned =
d_state_p->getAssignedOrUnassigning(uri);

if (assigned) {
// Queue is assigned. Verify that the key and partition match
// with what we already have.

if (assigned->partitionId() != queueInfo.partitionId() ||
(assigned->key() != queueKey)) {
if (!delayed) {
// Leader is telling self node to map a queue to new
// partition or have a new key (basically, its a new
// incarnation of the queue). This could occur when a
// queue is being opened-closed-opened in very quick
// succession. Old instance of the queue is deleted by
// the primary, primary broadcasts queue-unasssignment
// advisory, leader broadcasts queue-assignment
// advisory for the new instance of the queue, but self
// node receives those 2 broadcasts out of order
// (leader's advisory followed by primary's advisory).
// In this case, its beneficial to force-update self's
// view of the queue with what the leader is
// advertising (with an error). When self receives
// queue-unassignment advisory from the primary for the
// old instance of the queue, it will log an error and
// ignore it.

BALL_LOG_ERROR
<< d_cluster_p->description() << ": "
<< "received queueAssignmentAdvisory from leader '"
<< source->nodeDescription() << "' for a known and"
<< " assigned queue with different "
<< "partitionId/key: [received: " << queueInfo
<< ", knownPartitionId: " << assigned->partitionId()
<< ", knownQueueKey: " << assigned->key() << "]";
}
else {
// Queue is assigned, and there is no partitionId/queueKey
// mismatch. So this assert should not fire.
BSLS_ASSERT_SAFE(1 ==
d_state_p->queueKeys().count(queueKey));
// There is partitionId/queueKey mismatch and this is a
// delayed (aka, buffered) advisory. This is a valid
// scenario. Here's how: Node starts up, initiates
// storage sync with the primary While recovery is
// underway, a queue, which is active, is deleted and
// unassigned by the primary. Further, same queue is
// opened again, which means leader may assign it to a
// different partition, and will definitely assign it a
// different queue key, and will issue a queue
// assignment advisory. But self will buffer it. When
// recovery is complete, self's storage manager will
// apply all recovered queues (including the previous
// incarnation of this queue) to self's cluster state
// (via 'ClusterStateManager::registerQueueInfo'), and
// thus, populate 'd_queues', and this is how we will
// end up here. So instead of alarming/asserting, we
// simply log at warn, and overwrite current state with
// the buffered (this) advisory and move on.

BALL_LOG_WARN
<< d_cluster_p->description()
<< ": overwriting current known queue state "
<< "with the buffered advisory for queue ["
<< assigned->uri() << "]. Current assigned Partition ["
<< assigned->partitionId() << "], current queueKey ["
<< assigned->key() << "], new Partition ["
<< queueInfo.partitionId() << "], new queueKey ["
<< queueKey << "].";
}

// Remove existing state, mapping, etc.

d_state_p->queueKeys().erase(assigned->key());

// no need to update d_state_p->domainStates() entry
// , queue was already known and registered
AppInfos appIdInfos(d_allocator_p);

mqbc::ClusterUtil::parseQueueInfo(&appIdInfos,
queueInfo,
d_allocator_p);

BSLA_MAYBE_UNUSED const bool rc = d_state_p->assignQueue(
uri,
queueKey,
queueInfo.partitionId(),
appIdInfos);
BSLS_ASSERT_SAFE(rc == false);
}
else {
// Queue is assigned, and there is no partitionId/queueKey
// mismatch. So this assert should not fire.
BSLS_ASSERT_SAFE(1 == d_state_p->queueKeys().count(queueKey));
}
}

if (!queueAlreadyAssigned) {
else {
AppInfos appIdInfos(d_allocator_p);

mqbc::ClusterUtil::parseQueueInfo(&appIdInfos,
Expand Down Expand Up @@ -1865,19 +1855,10 @@ void ClusterStateManager::processQueueUnAssignmentAdvisory(
mqbu::StorageKey key(mqbu::StorageKey::BinaryRepresentation(),
queueInfo.key().data());

bool hasQueue = true;
const DomainStatesCIter domCit = d_state_p->domainStates().find(
uri.qualifiedDomain());
if (domCit == d_state_p->domainStates().cend()) {
hasQueue = false;
}
const UriToQueueInfoMapCIter qcit = domCit->second->queuesInfo().find(
uri);
if (qcit == domCit->second->queuesInfo().cend()) {
hasQueue = false;
}
mqbc::ClusterStateQueueInfo* assigned =
d_state_p->getAssignedOrUnassigning(uri);

if (!hasQueue) {
if (assigned == 0) {
// Queue is not assigned. Error because it should not occur.

BALL_LOG_ERROR << d_cluster_p->description()
Expand All @@ -1892,8 +1873,8 @@ void ClusterStateManager::processQueueUnAssignmentAdvisory(
// Self node sees queue as assigned. Validate that the key/partition
// from the unassignment match the internal state.

if ((qcit->second->partitionId() != advisory.partitionId()) ||
(qcit->second->key() != key)) {
if ((assigned->partitionId() != advisory.partitionId()) ||
(assigned->key() != key)) {
// This can occur if a queue is deleted by the primary and created
// immediately by the client. Primary broadcasts queue
// unassignment advisory upon deleting old instance of the queue,
Expand All @@ -1915,8 +1896,8 @@ void ClusterStateManager::processQueueUnAssignmentAdvisory(
<< advisory.partitionId()
<< ", advisoryKey: " << key
<< ", internalPartitionId: "
<< qcit->second->partitionId()
<< ", internalKey: " << qcit->second->key() << "]";
<< assigned->partitionId()
<< ", internalKey: " << assigned->key() << "]";
continue; // CONTINUE
}
d_state_p->unassignQueue(uri);
Expand Down
Loading

0 comments on commit 01135f3

Please sign in to comment.