From 263600c86a727686016c01d1d33c7ca38dc62c1a Mon Sep 17 00:00:00 2001 From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> Date: Sat, 7 Dec 2024 12:49:59 -0500 Subject: [PATCH] Addressing review; fixing bad merge Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --- .../mqb/mqbblp/mqbblp_clusterqueuehelper.cpp | 145 +++++++----- .../mqb/mqbblp/mqbblp_clusterqueuehelper.h | 34 +-- .../mqb/mqbblp/mqbblp_clusterstatemanager.cpp | 210 ++++++++---------- src/groups/mqb/mqbblp/mqbblp_domain.cpp | 8 +- src/groups/mqb/mqbblp/mqbblp_domain.h | 3 +- src/groups/mqb/mqbc/mqbc_clusterstate.cpp | 76 ++++--- src/groups/mqb/mqbc/mqbc_clusterstate.h | 105 ++++++--- src/groups/mqb/mqbc/mqbc_clusterutil.cpp | 78 ++++--- src/groups/mqb/mqbc/mqbc_clusterutil.h | 8 +- .../mqbc/mqbc_incoreclusterstateledger.cpp | 12 +- 10 files changed, 379 insertions(+), 300 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index 1c4030591..b457cf04b 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -214,7 +214,7 @@ ClusterQueueHelper::QueueLiveState::QueueLiveState( } // MANIPULATORS -void ClusterQueueHelper::QueueLiveState::resetAndKeepPending() +void ClusterQueueHelper::QueueLiveState::resetButKeepPending() { // NOTE: Do not reset d_pending and d_inFlight, and some other data. @@ -386,8 +386,7 @@ ClusterQueueHelper::assignQueue(const QueueContextSp& queueContext) // PRECONDITIONS BSLS_ASSERT_SAFE( d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); - BSLS_ASSERT_SAFE(!isQueueAssigned(*queueContext) || - queueContext->d_stateQInfo_sp->pendingUnassignment()); + BSLS_ASSERT_SAFE(!isQueueAssigned(*queueContext)); if (d_cluster_p->isRemote()) { // Assigning a queue in a remote, is simply giving it a new queueId. @@ -3928,7 +3927,7 @@ void ClusterQueueHelper::onClusterLeader( } void ClusterQueueHelper::onQueueAssigned( - const mqbc::ClusterStateQueueInfo& info) + const bsl::shared_ptr& info) { // executed by the cluster *DISPATCHER* thread @@ -3936,6 +3935,7 @@ void ClusterQueueHelper::onQueueAssigned( BSLS_ASSERT_SAFE( d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); BSLS_ASSERT_SAFE(!d_cluster_p->isRemote()); + BSLS_ASSERT_SAFE(info); const mqbnet::ClusterNode* leaderNode = d_clusterData_p->electorInfo().leaderNode(); @@ -3944,7 +3944,8 @@ void ClusterQueueHelper::onQueueAssigned( : "** UNKNOWN**"; QueueContextSp queueContext; - QueueContextMapIter queueContextIt = d_queues.find(info.uri()); + QueueContextMapIter queueContextIt = d_queues.find(info->uri()); + if (queueContextIt != d_queues.end()) { // We already have a queueContext created for that queue queueContext = queueContextIt->second; @@ -3955,10 +3956,10 @@ void ClusterQueueHelper::onQueueAssigned( // partitionId/queueKey mismatch. And d_queueKeys must also // contain the key. BSLS_ASSERT_SAFE( - (queueContext->partitionId() == info.partitionId()) && - (queueContext->key() == info.key())); + (queueContext->partitionId() == info->partitionId()) && + (queueContext->key() == info->key())); BSLS_ASSERT_SAFE(1 == - d_clusterState_p->queueKeys().count(info.key())); + d_clusterState_p->queueKeys().count(info->key())); BSLS_ASSERT_SAFE( !queueContext->d_stateQInfo_sp->pendingUnassignment()); @@ -3967,7 +3968,7 @@ void ClusterQueueHelper::onQueueAssigned( return; // RETURN } else { - if (1 == d_clusterState_p->queueKeys().count(info.key())) { + if (1 == d_clusterState_p->queueKeys().count(info->key())) { // Self node's queue context is unaware of the assigned queue, // but queueKey specified in the advisory is present in the // 'queueKeys' data structure. @@ -3976,8 +3977,8 @@ void ClusterQueueHelper::onQueueAssigned( << d_cluster_p->description() << ": attempting to apply queue assignment for a known but" << " unassigned queue, but queueKey is not unique. " - << "QueueKey [" << info.key() << "], URI [" << info.uri() - << "], Partition [" << info.partitionId() + << "QueueKey [" << info->key() << "], URI [" << info->uri() + << "], Partition [" << info->partitionId() << "]. Current leader is: '" << leaderDescription << "'. Ignoring this entry in the advisory." << BMQTSK_ALARMLOG_END; @@ -3985,17 +3986,16 @@ void ClusterQueueHelper::onQueueAssigned( } // Update queue's mapping etc. - mqbc::ClusterState::QueueKeysInsertRc insertRc = - d_clusterState_p->queueKeys().insert(info.key()); + BSLA_MAYBE_UNUSED mqbc::ClusterState::QueueKeysInsertRc insertRc = + d_clusterState_p->queueKeys().insert(info->key()); BSLS_ASSERT_SAFE(insertRc.second); - (void)insertRc.second; } } else { // First time hearing about this queue. Update 'queueKeys' and // ensure that queue key is unique. mqbc::ClusterState::QueueKeysInsertRc insertRc = - d_clusterState_p->queueKeys().insert(info.key()); + d_clusterState_p->queueKeys().insert(info->key()); if (false == insertRc.second) { // QueueKey is not unique. @@ -4003,8 +4003,8 @@ void ClusterQueueHelper::onQueueAssigned( BMQTSK_ALARMLOG_ALARM("CLUSTER_STATE") << d_cluster_p->description() << ": attempting to apply queue assignment for an unknown " - << "queue [" << info.uri() << "] assigned to Partition [" - << info.partitionId() << "], but queueKey [" << info.key() + << "queue [" << info->uri() << "] assigned to Partition [" + << info->partitionId() << "], but queueKey [" << info->key() << "] is not unique. Current leader is: '" << leaderDescription << "'. Ignoring this assignment." << BMQTSK_ALARMLOG_END; return; // RETURN @@ -4012,17 +4012,17 @@ void ClusterQueueHelper::onQueueAssigned( // Create the queueContext. queueContext.reset(new (*d_allocator_p) - QueueContext(info.uri(), d_allocator_p), + QueueContext(info->uri(), d_allocator_p), d_allocator_p); - d_queues[info.uri()] = queueContext; + d_queues[info->uri()] = queueContext; } mqbc::ClusterState::DomainState& domainState = - *d_clusterState_p->domainStates().at(info.uri().qualifiedDomain()); + *d_clusterState_p->domainStates().at(info->uri().qualifiedDomain()); domainState.adjustQueueCount(1); - queueContext->d_stateQInfo_sp = domainState.queuesInfo().at(info.uri()); + queueContext->d_stateQInfo_sp = info; // Queue assignment from the leader is honored per the info updated // above @@ -4033,7 +4033,7 @@ void ClusterQueueHelper::onQueueAssigned( // replica nodes when they receive a queue creation record from the primary // in the partition stream. if (d_cluster_p->isCSLModeEnabled()) { - if (!d_clusterState_p->isSelfPrimary(info.partitionId())) { + if (!d_clusterState_p->isSelfPrimary(info->partitionId())) { // This is a replica node // Note: It's possible that the queue has already been registered @@ -4041,21 +4041,21 @@ void ClusterQueueHelper::onQueueAssigned( // recovery. Therefore, we will allow for duplicate registration // which will simply result in a no-op. d_storageManager_p->registerQueueReplica( - info.partitionId(), - info.uri(), - info.key(), + info->partitionId(), + info->uri(), + info->key(), d_clusterState_p->domainStates() - .at(info.uri().qualifiedDomain()) + .at(info->uri().qualifiedDomain()) ->domain(), true); // allowDuplicate d_storageManager_p->updateQueueReplica( - info.partitionId(), - info.uri(), - info.key(), - info.appInfos(), + info->partitionId(), + info->uri(), + info->key(), + info->appInfos(), d_clusterState_p->domainStates() - .at(info.uri().qualifiedDomain()) + .at(info->uri().qualifiedDomain()) ->domain(), true); // allowDuplicate } @@ -4068,7 +4068,7 @@ void ClusterQueueHelper::onQueueAssigned( } void ClusterQueueHelper::onQueueUnassigned( - const mqbc::ClusterStateQueueInfo& info) + const bsl::shared_ptr& info) { // executed by the cluster *DISPATCHER* thread @@ -4076,11 +4076,12 @@ void ClusterQueueHelper::onQueueUnassigned( BSLS_ASSERT_SAFE( d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); BSLS_ASSERT_SAFE(!d_cluster_p->isRemote()); + BSLS_ASSERT_SAFE(info); const bsl::string& leaderDesc = d_clusterData_p->electorInfo().leaderNode()->nodeDescription(); - const QueueContextMapIter queueContextIt = d_queues.find(info.uri()); + const QueueContextMapIter queueContextIt = d_queues.find(info->uri()); if (queueContextIt == d_queues.end()) { // We don't know about that uri .. nothing to do, but error because // it should not happen. @@ -4092,7 +4093,8 @@ void ClusterQueueHelper::onQueueUnassigned( << ": Ignoring queue unassignment from leader " << leaderDesc << ", for unknown queue: " << info; - BSLS_ASSERT_SAFE(0 == d_clusterState_p->queueKeys().count(info.key())); + BSLS_ASSERT_SAFE(0 == + d_clusterState_p->queueKeys().count(info->key())); // Since queue uri is unknown to self node, queue key should be // unknown too. @@ -4101,7 +4103,11 @@ void ClusterQueueHelper::onQueueUnassigned( const QueueContextSp& queueContextSp = queueContextIt->second; QueueLiveState& qinfo = queueContextSp->d_liveQInfo; - if (!isQueueAssigned(*queueContextSp)) { + + mqbc::ClusterStateQueueInfo* assigned = + d_clusterState_p->getAssignedOrUnassigning(queueContextSp->uri()); + + if (assigned == 0) { // Queue is known but not assigned. Error because it should not occur. // Note that it may occur if self node is starting, received an // open-queue request for this queue (and thus, populated 'd_queues' @@ -4114,8 +4120,8 @@ void ClusterQueueHelper::onQueueUnassigned( << " because self node sees queue as unassigned."; return; // RETURN } - BSLS_ASSERT_SAFE(queueContextSp->partitionId() == info.partitionId() && - queueContextSp->key() == info.key()); + BSLS_ASSERT_SAFE(queueContextSp->partitionId() == info->partitionId() && + queueContextSp->key() == info->key()); if (0 != qinfo.d_numQueueHandles) { // This could occur if destruction of a handle at self node is delayed @@ -4129,14 +4135,14 @@ void ClusterQueueHelper::onQueueUnassigned( << qinfo.d_numQueueHandles << "]."; } - if (d_clusterState_p->isSelfPrimary(info.partitionId())) { + if (d_clusterState_p->isSelfPrimary(info->partitionId())) { // openQueue while queue unassigning cancels the unassigning // so we can safely delete it from the various maps. removeQueueRaw(queueContextIt); // Unregister the queue/storage from the partition, which will end up // issuing a QueueDeletion record. Note that this method is async. - d_storageManager_p->unregisterQueue(info.uri(), info.partitionId()); + d_storageManager_p->unregisterQueue(info->uri(), info->partitionId()); } else { // This is a replica node. @@ -4170,16 +4176,14 @@ void ClusterQueueHelper::onQueueUnassigned( if (queueContextSp->d_liveQInfo.d_queue_sp) { d_clusterState_p->updatePartitionNumActiveQueues( - info.partitionId(), + info->partitionId(), -1); } d_queuesById.erase(qinfo.d_id); - qinfo.resetAndKeepPending(); + qinfo.resetButKeepPending(); // CQH will recreate 'queueContextSp->d_liveQInfo.d_queue_sp' upon // 'onOpenQueueResponse' - // We do this in CSL mode only, such that isQueueAssigned() will - // return false. queueContextSp->d_stateQInfo_sp.reset(); } else { @@ -4194,15 +4198,18 @@ void ClusterQueueHelper::onQueueUnassigned( // Note: In non-CSL mode, the queue deletion callback is instead // invoked at nodes when they receive a queue deletion record from the // primary in the partition stream. - d_storageManager_p->unregisterQueueReplica(info.partitionId(), - info.uri(), - info.key(), - mqbu::StorageKey()); + + if (d_cluster_p->isCSLModeEnabled()) { + d_storageManager_p->unregisterQueueReplica(info->partitionId(), + info->uri(), + info->key(), + mqbu::StorageKey()); + } } - d_clusterState_p->queueKeys().erase(info.key()); + d_clusterState_p->queueKeys().erase(info->key()); d_clusterState_p->domainStates() - .at(info.uri().qualifiedDomain()) + .at(info->uri().qualifiedDomain()) ->adjustQueueCount(-1); BALL_LOG_INFO << d_cluster_p->description() @@ -4449,6 +4456,43 @@ void ClusterQueueHelper::openQueue( QueueContextMapIter queueContextIt = d_queues.find(uriKey); + // NOTE: See TBD in 'onGetDomainDispatched': if the request comes from a + // peer inside the cluster, 'clientIdentity' will represent our own + // identity instead of that of the peer; which is obviously wrong; + // however, here we only want to use it to determine whether the + // request comes from a peer node in the cluster (and not a client or + // a proxy broker), and so this is still fine. + if (clientContext->identity().clientType() == + bmqp_ctrlmsg::ClientType::E_TCPBROKER && + !clientContext->identity().clusterName().empty() && + clientContext->identity().clusterNodeId() != + mqbnet::Cluster::k_INVALID_NODE_ID) { + // The request came from a peer in the cluster, make sure we are the + // primary for the partition. Since we received the openQueue request + // from a in-cluster peer node, we should have already received a queue + // advisory assignment from the leader about that queue; however maybe + // events will come out of order, so just return a NOT_PRIMARY + // retryable error in this case and let the peer re-emit a request. + bsl::string reason; + mqbi::ClusterErrorCode::Enum errorCode = + mqbi::ClusterErrorCode::e_UNKNOWN; + if (queueContextIt == d_queues.end()) { + reason = "Not aware of that queue"; + errorCode = mqbi::ClusterErrorCode::e_UNKNOWN_QUEUE; + CALLBACK_FAILURE(reason, errorCode); + return; // RETURN + } + const int pid = queueContextIt->second->partitionId(); + if (!isSelfAvailablePrimary(pid)) { + bmqu::MemOutStream errorDesc; + errorDesc << "Not the primary for partitionId [" << pid << "]"; + reason = errorDesc.str(); + errorCode = mqbi::ClusterErrorCode::e_NOT_PRIMARY; + CALLBACK_FAILURE(reason, errorCode); + return; // RETURN + } + } + // Create an OpenQueue context for that request. OpenQueueContext context; context.d_domain_p = domain; @@ -4542,8 +4586,7 @@ void ClusterQueueHelper::openQueue( // advisory are received). So to be safe, we explicitly attempt to // assign the queue, which is a no-op in case there is no leader. - if (!isQueueAssigned(*(queueContextIt->second)) || - isQueuePendingUnassignment(*(queueContextIt->second))) { + if (!isQueueAssigned(*(queueContextIt->second))) { // In CSL, unassignment is async. // Since QueueUnassignmentAdvisory can contain multiple queues, // canceling pending Advisory is not an option. diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h index 65df42508..82e966108 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h @@ -285,7 +285,7 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL /// Reset the `id`, `partitionId`, `key` and `queue` members of this /// object. Note that `uri` is left untouched because it is an /// invariant member of a given instance of such a QueueInfo object. - void resetAndKeepPending(); + void resetButKeepPending(); }; struct StopContext { @@ -910,10 +910,6 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL /// assigned. bool isQueueAssigned(const QueueContext& queueContext) const; - /// Return true if the queue in the specified `queueContext` is - /// pending unassignment (async in CSL). - bool isQueuePendingUnassignment(const QueueContext& queueContext) const; - /// Return true if the queue in the specified `queueContext` is assigned /// and its associated primary is AVAILABLE and is different from the /// optionally specified `otherThan`. @@ -959,7 +955,8 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. - void onQueueAssigned(const mqbc::ClusterStateQueueInfo& info) + void + onQueueAssigned(const bsl::shared_ptr& info) BSLS_KEYWORD_OVERRIDE; /// Callback invoked when a queue with the specified `info` gets @@ -967,7 +964,8 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. - void onQueueUnassigned(const mqbc::ClusterStateQueueInfo& info) + void + onQueueUnassigned(const bsl::shared_ptr& info) BSLS_KEYWORD_OVERRIDE; /// Callback invoked when a queue with the specified `uri` belonging to @@ -1240,31 +1238,19 @@ ClusterQueueHelper::isQueueAssigned(const QueueContext& queueContext) const bmqp::QueueId::k_UNASSIGNED_QUEUE_ID; // RETURN } - DomainStatesCIter domCit = d_clusterState_p->domainStates().find( - queueContext.uri().qualifiedDomain()); - if (domCit == d_clusterState_p->domainStates().cend()) { - return false; // RETURN - } - - UriToQueueInfoMapCIter qCit = domCit->second->queuesInfo().find( + mqbc::ClusterStateQueueInfo* assigned = d_clusterState_p->getAssigned( queueContext.uri()); - if (qCit == domCit->second->queuesInfo().cend()) { + + if (assigned == 0) { return false; // RETURN } - BSLS_ASSERT_SAFE(qCit->second->partitionId() != + BSLS_ASSERT_SAFE(assigned->partitionId() != mqbs::DataStore::k_INVALID_PARTITION_ID && - !qCit->second->key().isNull()); + !assigned->key().isNull()); return true; } -inline bool ClusterQueueHelper::isQueuePendingUnassignment( - const QueueContext& queueContext) const -{ - const ClusterStateQueueInfoCSp& state = queueContext.d_stateQInfo_sp; - return state ? state->pendingUnassignment() : false; -} - inline bool ClusterQueueHelper::isQueuePrimaryAvailable( const QueueContext& queueContext, mqbnet::ClusterNode* otherThan) const diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp index 77f9835af..16fdfcc0b 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp @@ -1601,109 +1601,99 @@ void ClusterStateManager::processQueueAssignmentAdvisory( mqbu::StorageKey::BinaryRepresentation(), queueInfo.key().data()); - bool queueAlreadyAssigned = false; - const DomainStatesCIter domCit = d_state_p->domainStates().find( - uri.qualifiedDomain()); - if (domCit != d_state_p->domainStates().cend()) { - UriToQueueInfoMapCIter qcit = domCit->second->queuesInfo().find( - uri); - if (qcit != domCit->second->queuesInfo().cend()) { - // Queue is assigned. Verify that the key and partition match - // with what we already have. - queueAlreadyAssigned = true; - - if (qcit->second->partitionId() != queueInfo.partitionId() || - (qcit->second->key() != queueKey)) { - if (!delayed) { - // Leader is telling self node to map a queue to new - // partition or have a new key (basically, its a new - // incarnation of the queue). This could occur when a - // queue is being opened-closed-opened in very quick - // succession. Old instance of the queue is deleted by - // the primary, primary broadcasts queue-unasssignment - // advisory, leader broadcasts queue-assignment - // advisory for the new instance of the queue, but self - // node receives those 2 broadcasts out of order - // (leader's advisory followed by primary's advisory). - // In this case, its beneficial to force-update self's - // view of the queue with what the leader is - // advertising (with an error). When self receives - // queue-unassignment advisory from the primary for the - // old instance of the queue, it will log an error and - // ignore it. - - BALL_LOG_ERROR - << d_cluster_p->description() << ": " - << "received queueAssignmentAdvisory from leader '" - << source->nodeDescription() << "' for a known and" - << " assigned queue with different " - << "partitionId/key: [received: " << queueInfo - << ", knownPartitionId: " - << qcit->second->partitionId() - << ", knownQueueKey: " << qcit->second->key() - << "]"; - } - else { - // There is partitionId/queueKey mismatch and this is a - // delayed (aka, buffered) advisory. This is a valid - // scenario. Here's how: Node starts up, initiates - // storage sync with the primary While recovery is - // underway, a queue, which is active, is deleted and - // unassigned by the primary. Further, same queue is - // opened again, which means leader may assign it to a - // different partition, and will definitely assign it a - // different queue key, and will issue a queue - // assignment advisory. But self will buffer it. When - // recovery is complete, self's storage manager will - // apply all recovered queues (including the previous - // incarnation of this queue) to self's cluster state - // (via 'ClusterStateManager::registerQueueInfo'), and - // thus, populate 'd_queues', and this is how we will - // end up here. So instead of alarming/asserting, we - // simply log at warn, and overwrite current state with - // the buffered (this) advisory and move on. - - BALL_LOG_WARN - << d_cluster_p->description() - << ": overwriting current known queue state " - << "with the buffered advisory for queue [" - << qcit->second->uri() - << "]. Current assigned Partition [" - << qcit->second->partitionId() - << "], current queueKey [" << qcit->second->key() - << "], new Partition [" << queueInfo.partitionId() - << "], new queueKey [" << queueKey << "]."; - } - - // Remove existing state, mapping, etc. - - d_state_p->queueKeys().erase(qcit->second->key()); - - // no need to update d_state_p->domainStates() entry - // , queue was already known and registered - AppInfos appIdInfos(d_allocator_p); - - mqbc::ClusterUtil::parseQueueInfo(&appIdInfos, - queueInfo, - d_allocator_p); - - BSLA_MAYBE_UNUSED const bool rc = d_state_p->assignQueue( - uri, - queueKey, - queueInfo.partitionId(), - appIdInfos); - BSLS_ASSERT_SAFE(rc == false); + mqbc::ClusterStateQueueInfo* assigned = d_state_p->getAssigned(uri); + // Only Replica can `processQueueAssignmentAdvisory`. Therefore, the + // state cannot be `k_UNASSIGNING` + + if (assigned) { + // Queue is assigned. Verify that the key and partition match + // with what we already have. + + if (assigned->partitionId() != queueInfo.partitionId() || + (assigned->key() != queueKey)) { + if (!delayed) { + // Leader is telling self node to map a queue to new + // partition or have a new key (basically, its a new + // incarnation of the queue). This could occur when a + // queue is being opened-closed-opened in very quick + // succession. Old instance of the queue is deleted by + // the primary, primary broadcasts queue-unasssignment + // advisory, leader broadcasts queue-assignment + // advisory for the new instance of the queue, but self + // node receives those 2 broadcasts out of order + // (leader's advisory followed by primary's advisory). + // In this case, its beneficial to force-update self's + // view of the queue with what the leader is + // advertising (with an error). When self receives + // queue-unassignment advisory from the primary for the + // old instance of the queue, it will log an error and + // ignore it. + + BALL_LOG_ERROR + << d_cluster_p->description() << ": " + << "received queueAssignmentAdvisory from leader '" + << source->nodeDescription() << "' for a known and" + << " assigned queue with different " + << "partitionId/key: [received: " << queueInfo + << ", knownPartitionId: " << assigned->partitionId() + << ", knownQueueKey: " << assigned->key() << "]"; } else { - // Queue is assigned, and there is no partitionId/queueKey - // mismatch. So this assert should not fire. - BSLS_ASSERT_SAFE(1 == - d_state_p->queueKeys().count(queueKey)); + // There is partitionId/queueKey mismatch and this is a + // delayed (aka, buffered) advisory. This is a valid + // scenario. Here's how: Node starts up, initiates + // storage sync with the primary While recovery is + // underway, a queue, which is active, is deleted and + // unassigned by the primary. Further, same queue is + // opened again, which means leader may assign it to a + // different partition, and will definitely assign it a + // different queue key, and will issue a queue + // assignment advisory. But self will buffer it. When + // recovery is complete, self's storage manager will + // apply all recovered queues (including the previous + // incarnation of this queue) to self's cluster state + // (via 'ClusterStateManager::registerQueueInfo'), and + // thus, populate 'd_queues', and this is how we will + // end up here. So instead of alarming/asserting, we + // simply log at warn, and overwrite current state with + // the buffered (this) advisory and move on. + + BALL_LOG_WARN + << d_cluster_p->description() + << ": overwriting current known queue state " + << "with the buffered advisory for queue [" + << assigned->uri() << "]. Current assigned Partition [" + << assigned->partitionId() << "], current queueKey [" + << assigned->key() << "], new Partition [" + << queueInfo.partitionId() << "], new queueKey [" + << queueKey << "]."; } + + // Remove existing state, mapping, etc. + + d_state_p->queueKeys().erase(assigned->key()); + // no need to update d_state_p->domainStates() entry + // , queue was already known and registered + AppInfos appIdInfos(d_allocator_p); + + mqbc::ClusterUtil::parseQueueInfo(&appIdInfos, + queueInfo, + d_allocator_p); + + BSLA_MAYBE_UNUSED const bool rc = d_state_p->assignQueue( + uri, + queueKey, + queueInfo.partitionId(), + appIdInfos); + BSLS_ASSERT_SAFE(rc == false); + } + else { + // Queue is assigned, and there is no partitionId/queueKey + // mismatch. So this assert should not fire. + BSLS_ASSERT_SAFE(1 == d_state_p->queueKeys().count(queueKey)); } } - - if (!queueAlreadyAssigned) { + else { AppInfos appIdInfos(d_allocator_p); mqbc::ClusterUtil::parseQueueInfo(&appIdInfos, @@ -1865,19 +1855,11 @@ void ClusterStateManager::processQueueUnAssignmentAdvisory( mqbu::StorageKey key(mqbu::StorageKey::BinaryRepresentation(), queueInfo.key().data()); - bool hasQueue = true; - const DomainStatesCIter domCit = d_state_p->domainStates().find( - uri.qualifiedDomain()); - if (domCit == d_state_p->domainStates().cend()) { - hasQueue = false; - } - const UriToQueueInfoMapCIter qcit = domCit->second->queuesInfo().find( - uri); - if (qcit == domCit->second->queuesInfo().cend()) { - hasQueue = false; - } + mqbc::ClusterStateQueueInfo* assigned = d_state_p->getAssigned(uri); + // Only Replica can `processQueueAssignmentAdvisory`. Therefore, the + // state cannot be `k_UNASSIGNING` - if (!hasQueue) { + if (assigned == 0) { // Queue is not assigned. Error because it should not occur. BALL_LOG_ERROR << d_cluster_p->description() @@ -1892,8 +1874,8 @@ void ClusterStateManager::processQueueUnAssignmentAdvisory( // Self node sees queue as assigned. Validate that the key/partition // from the unassignment match the internal state. - if ((qcit->second->partitionId() != advisory.partitionId()) || - (qcit->second->key() != key)) { + if ((assigned->partitionId() != advisory.partitionId()) || + (assigned->key() != key)) { // This can occur if a queue is deleted by the primary and created // immediately by the client. Primary broadcasts queue // unassignment advisory upon deleting old instance of the queue, @@ -1915,8 +1897,8 @@ void ClusterStateManager::processQueueUnAssignmentAdvisory( << advisory.partitionId() << ", advisoryKey: " << key << ", internalPartitionId: " - << qcit->second->partitionId() - << ", internalKey: " << qcit->second->key() << "]"; + << assigned->partitionId() + << ", internalKey: " << assigned->key() << "]"; continue; // CONTINUE } d_state_p->unassignQueue(uri); diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index 1f1928ffd..3fd90fb12 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -276,13 +276,15 @@ void Domain::updateAuthorizedAppIds(const AppInfos& addedAppIds, } } -void Domain::onQueueAssigned(const mqbc::ClusterStateQueueInfo& info) +void Domain::onQueueAssigned( + const bsl::shared_ptr& info) { // executed by the associated CLUSTER's DISPATCHER thread // PRECONDITIONS BSLS_ASSERT_SAFE( d_cluster_sp->dispatcher()->inDispatcherThread(d_cluster_sp.get())); + BSLS_ASSERT_SAFE(info); if (!d_cluster_sp->isCSLModeEnabled()) { return; // RETURN @@ -292,7 +294,7 @@ void Domain::onQueueAssigned(const mqbc::ClusterStateQueueInfo& info) return; // RETURN } - if (info.uri().domain() != d_name) { + if (info->uri().domain() != d_name) { // Note: This method will fire on all domains which belong to the // cluster having the queue assignment, but we examine the domain // name from the 'uri' to guarantee that only one domain is @@ -301,7 +303,7 @@ void Domain::onQueueAssigned(const mqbc::ClusterStateQueueInfo& info) return; // RETURN } - updateAuthorizedAppIds(info.appInfos()); + updateAuthorizedAppIds(info->appInfos()); } void Domain::onQueueUpdated(const bmqt::Uri& uri, diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.h b/src/groups/mqb/mqbblp/mqbblp_domain.h index dc1b76004..4fabd0b45 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.h +++ b/src/groups/mqb/mqbblp/mqbblp_domain.h @@ -211,7 +211,8 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. - void onQueueAssigned(const mqbc::ClusterStateQueueInfo& info) + void + onQueueAssigned(const bsl::shared_ptr& info) BSLS_KEYWORD_OVERRIDE; /// Callback invoked when a queue with the specified `uri` belonging to diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp index c98952232..b0615f910 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp @@ -73,13 +73,13 @@ void ClusterStateObserver::onPartitionPrimaryAssignment( } void ClusterStateObserver::onQueueAssigned( - BSLS_ANNOTATION_UNUSED const ClusterStateQueueInfo& info) + BSLS_ANNOTATION_UNUSED const bsl::shared_ptr& info) { // NOTHING } void ClusterStateObserver::onQueueUnassigned( - BSLS_ANNOTATION_UNUSED const ClusterStateQueueInfo& info) + BSLS_ANNOTATION_UNUSED const bsl::shared_ptr& info) { // NOTHING } @@ -322,43 +322,49 @@ bool ClusterState::assignQueue(const bmqt::Uri& uri, // PRECONDITIONS BSLS_ASSERT_SAFE(cluster()->dispatcher()->inDispatcherThread(cluster())); - bool isNewAssignment = true; - const DomainStatesIter domIt = d_domainStates.find(uri.qualifiedDomain()); + bool isNewAssignment = true; + DomainStatesIter domIt = domainStates().find(uri.qualifiedDomain()); + UriToQueueInfoMapIter queueIt; - if (domIt == d_domainStates.end()) { - d_domainStates[uri.qualifiedDomain()].createInplace(d_allocator_p, - d_allocator_p); - d_domainStates.at(uri.qualifiedDomain()) - ->queuesInfo()[uri] - .createInplace(d_allocator_p, - uri, - key, - partitionId, - appIdInfos, - d_allocator_p); + if (domIt == domainStates().end()) { + ClusterState::DomainStateSp domainState; + domainState.createInplace(d_allocator_p, d_allocator_p); + domIt = + domainStates().emplace(uri.qualifiedDomain(), domainState).first; + + queueIt = domIt->second->queuesInfo().end(); } else { - const UriToQueueInfoMapIter iter = domIt->second->queuesInfo().find( - uri); - if (iter == domIt->second->queuesInfo().end()) { - domIt->second->queuesInfo()[uri].createInplace(d_allocator_p, - uri, - key, - partitionId, - appIdInfos, - d_allocator_p); - } - else { - isNewAssignment = false; + queueIt = domIt->second->queuesInfo().find(uri); + } + + if (queueIt == domIt->second->queuesInfo().end()) { + QueueInfoSp queueInfo; - updatePartitionQueueMapped(iter->second->partitionId(), -1); - iter->second->setKey(key).setPartitionId(partitionId); - iter->second->appInfos() = appIdInfos; + queueInfo.createInplace(d_allocator_p, + uri, + key, + partitionId, + appIdInfos, + d_allocator_p); - iter->second->setState(ClusterStateQueueInfo::k_ASSIGNED); + queueIt = domIt->second->queuesInfo().emplace(uri, queueInfo).first; + } + else { + if (queueIt->second->state() == ClusterStateQueueInfo::k_ASSIGNED) { + // See 'ClusterStateManager::processQueueAssignmentAdvisory' which + // insists on re-assigning + isNewAssignment = false; + + updatePartitionQueueMapped(queueIt->second->partitionId(), -1); } + queueIt->second->setKey(key).setPartitionId(partitionId); + queueIt->second->appInfos() = appIdInfos; } + // Set the queue as assigned + queueIt->second->setState(ClusterStateQueueInfo::k_ASSIGNED); + updatePartitionQueueMapped(partitionId, 1); bmqu::Printer printer(&appIdInfos); @@ -370,11 +376,7 @@ bool ClusterState::assignQueue(const bmqt::Uri& uri, for (ObserversSetIter it = d_observers.begin(); it != d_observers.end(); ++it) { - (*it)->onQueueAssigned(ClusterStateQueueInfo(uri, - key, - partitionId, - appIdInfos, - d_allocator_p)); + (*it)->onQueueAssigned(queueIt->second); } // POSTCONDITIONS @@ -414,7 +416,7 @@ bool ClusterState::unassignQueue(const bmqt::Uri& uri) for (ObserversSetIter it = d_observers.begin(); it != d_observers.end(); ++it) { - (*it)->onQueueUnassigned(*cit->second); + (*it)->onQueueUnassigned(cit->second); } domIt->second->queuesInfo().erase(cit); diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.h b/src/groups/mqb/mqbc/mqbc_clusterstate.h index 286df4d45..52bdeb054 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.h +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.h @@ -166,7 +166,10 @@ class ClusterStateQueueInfo { typedef mqbi::ClusterStateManager::AppInfosCIter AppInfosCIter; enum State { - // State of Assignment + // State of Assignment. In CSL, assignment and unassignment are async, + // hence the need for k_ASSIGNING/k_UNASSIGNING + // Assigning following unassigning is also supported. + // On Replica, the only possible state is k_ASSIGNED. k_NONE, k_ASSIGNING, @@ -228,7 +231,7 @@ class ClusterStateQueueInfo { /// Set the corresponding member to the specified `value` and return a /// reference offering modifiable access to this object. - bool setState(State value); + void setState(State value); /// Get a modifiable reference to this object's appIdInfos. AppInfos& appInfos(); @@ -312,14 +315,16 @@ class ClusterStateObserver { /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. - virtual void onQueueAssigned(const ClusterStateQueueInfo& info); + virtual void + onQueueAssigned(const bsl::shared_ptr& info); /// Callback invoked when a queue with the specified `info` gets /// unassigned from the cluster. /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. - virtual void onQueueUnassigned(const ClusterStateQueueInfo& info); + virtual void + onQueueUnassigned(const bsl::shared_ptr& info); /// Callback invoked when a queue with the specified `uri` belonging to /// the specified `domain` is updated with the optionally specified @@ -655,6 +660,20 @@ class ClusterState { /// validation can be performed. The bahavior is undefined unless /// `partitionId >= 0` and `partitionId < partitionsCount`. const ClusterStatePartitionInfo& partition(int partitionId) const; + + /// Return `ClusterStateQueueInfo` for the specified `uri` or `0` if it + /// does not exist. + ClusterStateQueueInfo* getQueueInfo(const bmqt::Uri& uri) const; + + /// Return `ClusterStateQueueInfo` for the specified `uri` if it exists and + /// is in the `k_ASSIGNED` state or `0` otherwise. + ClusterStateQueueInfo* getAssigned(const bmqt::Uri& uri) const; + + /// Return `ClusterStateQueueInfo` for the specified `uri` if it exists and + /// is in either the `k_ASSIGNED` or `k_UNASSIGNING` state. Return `0` + /// otherwise. + ClusterStateQueueInfo* + getAssignedOrUnassigning(const bmqt::Uri& uri) const; }; // ============================================================================ @@ -811,33 +830,23 @@ inline ClusterStateQueueInfo& ClusterStateQueueInfo::setPartitionId(int value) return *this; } -inline bool ClusterStateQueueInfo::setState(ClusterStateQueueInfo::State value) +inline void ClusterStateQueueInfo::setState(ClusterStateQueueInfo::State value) { - // k_NONE - // | - // ClusterUtil::assignQueue | - // V - // k_ASSIGNING <---+ - // | | - // ClusterState::assignQueue | | - // V | - // k_ASSIGNED | - // | | - // | | ClusterState::assignQueue - // V | - // k_UNASSIGNING - - bool result = false; - - switch (d_state) { - case k_NONE: result = (value == k_ASSIGNING); break; - case k_ASSIGNING: result = (value == k_ASSIGNED); break; - case k_ASSIGNED: result = (value == k_UNASSIGNING); break; - case k_UNASSIGNING: result = (value == k_ASSIGNING); break; - } + // k_NONE + // | | + // ClusterUtil::assignQueue | | + // | V + // | k_ASSIGNING <---+ + // | | | + // ClusterState::assignQueue | | | + // V V | + // k_ASSIGNED | + // | | + // | | ClusterState::assignQueue + // V | + // k_UNASSIGNING d_state = value; - return result; } inline ClusterStateQueueInfo::AppInfos& ClusterStateQueueInfo::appInfos() @@ -1059,6 +1068,46 @@ ClusterState::partition(int partitionId) const return d_partitionsInfo[partitionId]; } +inline ClusterStateQueueInfo* +ClusterState::getQueueInfo(const bmqt::Uri& uri) const +{ + const DomainStatesCIter domCit = domainStates().find( + uri.qualifiedDomain()); + if (domCit == domainStates().cend()) { + return 0; + } + + UriToQueueInfoMapCIter qcit = domCit->second->queuesInfo().find(uri); + if (qcit == domCit->second->queuesInfo().cend()) { + return 0; + } + + return qcit->second.get(); +} + +inline ClusterStateQueueInfo* +ClusterState::getAssigned(const bmqt::Uri& uri) const +{ + ClusterStateQueueInfo* queue = getQueueInfo(uri); + + return queue ? queue->state() == ClusterStateQueueInfo::k_ASSIGNED ? queue + : 0 + : 0; +} + +inline ClusterStateQueueInfo* +ClusterState::getAssignedOrUnassigning(const bmqt::Uri& uri) const +{ + ClusterStateQueueInfo* queue = getQueueInfo(uri); + + return queue + ? queue->state() == ClusterStateQueueInfo::k_ASSIGNED || + queue->state() == ClusterStateQueueInfo::k_UNASSIGNING + ? queue + : 0 + : 0; +} + // -------------------------------- // struct ClusterState::DomainState // -------------------------------- diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp index 87f77ffbf..02a37a3c4 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp @@ -744,19 +744,6 @@ void ClusterUtil::processQueueAssignmentRequest( status.code() = 0; status.message() = ""; - const DomainStatesCIter cit = clusterState->domainStates().find( - uri.qualifiedDomain()); - if (cit != clusterState->domainStates().cend()) { - UriToQueueInfoMapCIter qcit = cit->second->queuesInfo().find(uri); - if (qcit != cit->second->queuesInfo().cend() && - !(cluster->isCSLModeEnabled() && - qcit->second->state() == ClusterStateQueueInfo::k_UNASSIGNING)) { - // Queue is already assigned - clusterData->messageTransmitter().sendMessage(response, requester); - return; // RETURN - } - } - assignQueue(clusterState, clusterData, ledger, @@ -870,6 +857,7 @@ ClusterUtil::assignQueue(ClusterState* clusterState, const bmqp_ctrlmsg::NodeStatus::Value nodeStatus = clusterData->membership().selfNodeStatus(); + if (!cluster->isFSMWorkflow() && bmqp_ctrlmsg::NodeStatus::E_AVAILABLE != nodeStatus) { BALL_LOG_WARN << cluster->description() @@ -886,29 +874,45 @@ ClusterUtil::assignQueue(ClusterState* clusterState, k_ASSIGNMENT_WHILE_UNAVAILABLE; // RETURN } - DomainStatesIter domIt = clusterState->domainStates().find( - uri.qualifiedDomain()); - if (domIt == clusterState->domainStates().end()) { - // REVISIT: This is also done in 'ClusterState::assignQueue' + ClusterStateQueueInfo::State previousState = ClusterStateQueueInfo::k_NONE; + + ClusterState::DomainStates& domainStates = clusterState->domainStates(); + DomainStatesIter domIt = domainStates.find(uri.qualifiedDomain()); + + UriToQueueInfoMapIter queueIt; - clusterState->domainStates()[uri.qualifiedDomain()].createInplace( - allocator, - allocator); - domIt = clusterState->domainStates().find(uri.qualifiedDomain()); + if (domIt == domainStates.end()) { + ClusterState::DomainStateSp domainState; + domainState.createInplace(allocator, allocator); + domIt = domainStates.emplace(uri.qualifiedDomain(), domainState).first; + + queueIt = domIt->second->queuesInfo().end(); } else { - // Set the queue as assigning (no longer pending unassignment) - - UriToQueueInfoMapCIter qcit = domIt->second->queuesInfo().find(uri); - if (qcit != domIt->second->queuesInfo().cend()) { - if (qcit->second->state() == ClusterStateQueueInfo::k_ASSIGNING) { - BALL_LOG_INFO << cluster->description() - << "queueAssignment of '" << uri - << "' is already pending."; - return QueueAssignmentResult::k_ASSIGNMENT_OK; - } - qcit->second->setState(ClusterStateQueueInfo::k_ASSIGNING); - } + queueIt = domIt->second->queuesInfo().find(uri); + } + + if (queueIt == domIt->second->queuesInfo().end()) { + QueueInfoSp queueInfo; + + queueInfo.createInplace(allocator, uri, allocator); + + queueIt = domIt->second->queuesInfo().emplace(uri, queueInfo).first; + } + else { + previousState = queueIt->second->state(); + } + + if (previousState == ClusterStateQueueInfo::k_ASSIGNING) { + BALL_LOG_INFO << cluster->description() << "queueAssignment of '" + << uri << "' is already pending."; + return QueueAssignmentResult::k_ASSIGNMENT_OK; + } + + if (previousState == ClusterStateQueueInfo::k_ASSIGNED) { + BALL_LOG_INFO << cluster->description() << "queueAssignment of '" + << uri << "' is already done."; + return QueueAssignmentResult::k_ASSIGNMENT_OK; } if (domIt->second->domain() == 0) { @@ -982,6 +986,14 @@ ClusterUtil::assignQueue(ClusterState* clusterState, } } + // Set the queue as assigning (no longer pending unassignment) + queueIt->second->setState(ClusterStateQueueInfo::k_ASSIGNING); + + BALL_LOG_INFO << "Cluster [" << cluster->description() + << "]: Transition: " << previousState << " -> " + << ClusterStateQueueInfo::k_ASSIGNING << " for [" << uri + << "]."; + // Populate 'queueAssignmentAdvisory' bdlma::LocalSequentialAllocator<1024> localAllocator(allocator); bmqp_ctrlmsg::ControlMessage controlMsg(&localAllocator); diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.h b/src/groups/mqb/mqbc/mqbc_clusterutil.h index 0a4f88930..a1c5f6b88 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.h +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.h @@ -133,8 +133,8 @@ struct ClusterUtil { /// Return true if the specified `spoPair` is valid, false otherwise. static bool isValid(const bmqp_ctrlmsg::SyncPointOffsetPair& spoPair); - /// Set the specified `uri` to have the specified `pendingUnassignment` - /// status in the specified `clusterState`. + /// Set the specified `uri` to have the `k_UNASSIGNING` state in the + /// specified `clusterState`. static void setPendingUnassignment(ClusterState* clusterState, const bmqt::Uri& uri); @@ -188,8 +188,8 @@ struct ClusterUtil { /// Process the queue assignment in the specified `request`, received /// from the specified `requester`, using the specified `clusterState`, - /// `clusterData`, `ledger`, `cluster`, `queueAssigningCb` and - /// `allocator`. Return the queue assignment result. + /// `clusterData`, `ledger`, `cluster`, and `allocator`. Return the queue + /// assignment result. /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. diff --git a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp index 834982b4f..378af7452 100644 --- a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp +++ b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp @@ -330,12 +330,14 @@ int IncoreClusterStateLedger::onLogRolloverCb(const mqbu::StorageKey& oldLogId, ++citer) { const mqbc::ClusterState::QueueInfoSp& infoSp = citer->second; - bmqp_ctrlmsg::QueueInfo queueInfo; - infoSp->key().loadBinary(&queueInfo.key()); - queueInfo.uri() = infoSp->uri().asString(); - queueInfo.partitionId() = infoSp->partitionId(); + if (infoSp->state() == ClusterStateQueueInfo::k_ASSIGNED) { + bmqp_ctrlmsg::QueueInfo queueInfo; + infoSp->key().loadBinary(&queueInfo.key()); + queueInfo.uri() = infoSp->uri().asString(); + queueInfo.partitionId() = infoSp->partitionId(); - leaderAdvisory.queues().push_back(queueInfo); + leaderAdvisory.queues().push_back(queueInfo); + } } }