Skip to content

Commit

Permalink
Addressing review; fixing bad merge
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Dec 7, 2024
1 parent 085e02f commit 263600c
Show file tree
Hide file tree
Showing 10 changed files with 379 additions and 300 deletions.
145 changes: 94 additions & 51 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp

Large diffs are not rendered by default.

34 changes: 10 additions & 24 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL
/// Reset the `id`, `partitionId`, `key` and `queue` members of this
/// object. Note that `uri` is left untouched because it is an
/// invariant member of a given instance of such a QueueInfo object.
void resetAndKeepPending();
void resetButKeepPending();
};

struct StopContext {
Expand Down Expand Up @@ -910,10 +910,6 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL
/// assigned.
bool isQueueAssigned(const QueueContext& queueContext) const;

/// Return true if the queue in the specified `queueContext` is
/// pending unassignment (async in CSL).
bool isQueuePendingUnassignment(const QueueContext& queueContext) const;

/// Return true if the queue in the specified `queueContext` is assigned
/// and its associated primary is AVAILABLE and is different from the
/// optionally specified `otherThan`.
Expand Down Expand Up @@ -959,15 +955,17 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
void onQueueAssigned(const mqbc::ClusterStateQueueInfo& info)
void
onQueueAssigned(const bsl::shared_ptr<mqbc::ClusterStateQueueInfo>& info)
BSLS_KEYWORD_OVERRIDE;

/// Callback invoked when a queue with the specified `info` gets
/// unassigned from the cluster.
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
void onQueueUnassigned(const mqbc::ClusterStateQueueInfo& info)
void
onQueueUnassigned(const bsl::shared_ptr<mqbc::ClusterStateQueueInfo>& info)
BSLS_KEYWORD_OVERRIDE;

/// Callback invoked when a queue with the specified `uri` belonging to
Expand Down Expand Up @@ -1240,31 +1238,19 @@ ClusterQueueHelper::isQueueAssigned(const QueueContext& queueContext) const
bmqp::QueueId::k_UNASSIGNED_QUEUE_ID; // RETURN
}

DomainStatesCIter domCit = d_clusterState_p->domainStates().find(
queueContext.uri().qualifiedDomain());
if (domCit == d_clusterState_p->domainStates().cend()) {
return false; // RETURN
}

UriToQueueInfoMapCIter qCit = domCit->second->queuesInfo().find(
mqbc::ClusterStateQueueInfo* assigned = d_clusterState_p->getAssigned(
queueContext.uri());
if (qCit == domCit->second->queuesInfo().cend()) {

if (assigned == 0) {
return false; // RETURN
}

BSLS_ASSERT_SAFE(qCit->second->partitionId() !=
BSLS_ASSERT_SAFE(assigned->partitionId() !=
mqbs::DataStore::k_INVALID_PARTITION_ID &&
!qCit->second->key().isNull());
!assigned->key().isNull());
return true;
}

inline bool ClusterQueueHelper::isQueuePendingUnassignment(
const QueueContext& queueContext) const
{
const ClusterStateQueueInfoCSp& state = queueContext.d_stateQInfo_sp;
return state ? state->pendingUnassignment() : false;
}

inline bool ClusterQueueHelper::isQueuePrimaryAvailable(
const QueueContext& queueContext,
mqbnet::ClusterNode* otherThan) const
Expand Down
210 changes: 96 additions & 114 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1601,109 +1601,99 @@ void ClusterStateManager::processQueueAssignmentAdvisory(
mqbu::StorageKey::BinaryRepresentation(),
queueInfo.key().data());

bool queueAlreadyAssigned = false;
const DomainStatesCIter domCit = d_state_p->domainStates().find(
uri.qualifiedDomain());
if (domCit != d_state_p->domainStates().cend()) {
UriToQueueInfoMapCIter qcit = domCit->second->queuesInfo().find(
uri);
if (qcit != domCit->second->queuesInfo().cend()) {
// Queue is assigned. Verify that the key and partition match
// with what we already have.
queueAlreadyAssigned = true;

if (qcit->second->partitionId() != queueInfo.partitionId() ||
(qcit->second->key() != queueKey)) {
if (!delayed) {
// Leader is telling self node to map a queue to new
// partition or have a new key (basically, its a new
// incarnation of the queue). This could occur when a
// queue is being opened-closed-opened in very quick
// succession. Old instance of the queue is deleted by
// the primary, primary broadcasts queue-unasssignment
// advisory, leader broadcasts queue-assignment
// advisory for the new instance of the queue, but self
// node receives those 2 broadcasts out of order
// (leader's advisory followed by primary's advisory).
// In this case, its beneficial to force-update self's
// view of the queue with what the leader is
// advertising (with an error). When self receives
// queue-unassignment advisory from the primary for the
// old instance of the queue, it will log an error and
// ignore it.

BALL_LOG_ERROR
<< d_cluster_p->description() << ": "
<< "received queueAssignmentAdvisory from leader '"
<< source->nodeDescription() << "' for a known and"
<< " assigned queue with different "
<< "partitionId/key: [received: " << queueInfo
<< ", knownPartitionId: "
<< qcit->second->partitionId()
<< ", knownQueueKey: " << qcit->second->key()
<< "]";
}
else {
// There is partitionId/queueKey mismatch and this is a
// delayed (aka, buffered) advisory. This is a valid
// scenario. Here's how: Node starts up, initiates
// storage sync with the primary While recovery is
// underway, a queue, which is active, is deleted and
// unassigned by the primary. Further, same queue is
// opened again, which means leader may assign it to a
// different partition, and will definitely assign it a
// different queue key, and will issue a queue
// assignment advisory. But self will buffer it. When
// recovery is complete, self's storage manager will
// apply all recovered queues (including the previous
// incarnation of this queue) to self's cluster state
// (via 'ClusterStateManager::registerQueueInfo'), and
// thus, populate 'd_queues', and this is how we will
// end up here. So instead of alarming/asserting, we
// simply log at warn, and overwrite current state with
// the buffered (this) advisory and move on.

BALL_LOG_WARN
<< d_cluster_p->description()
<< ": overwriting current known queue state "
<< "with the buffered advisory for queue ["
<< qcit->second->uri()
<< "]. Current assigned Partition ["
<< qcit->second->partitionId()
<< "], current queueKey [" << qcit->second->key()
<< "], new Partition [" << queueInfo.partitionId()
<< "], new queueKey [" << queueKey << "].";
}

// Remove existing state, mapping, etc.

d_state_p->queueKeys().erase(qcit->second->key());

// no need to update d_state_p->domainStates() entry
// , queue was already known and registered
AppInfos appIdInfos(d_allocator_p);

mqbc::ClusterUtil::parseQueueInfo(&appIdInfos,
queueInfo,
d_allocator_p);

BSLA_MAYBE_UNUSED const bool rc = d_state_p->assignQueue(
uri,
queueKey,
queueInfo.partitionId(),
appIdInfos);
BSLS_ASSERT_SAFE(rc == false);
mqbc::ClusterStateQueueInfo* assigned = d_state_p->getAssigned(uri);
// Only Replica can `processQueueAssignmentAdvisory`. Therefore, the
// state cannot be `k_UNASSIGNING`

if (assigned) {
// Queue is assigned. Verify that the key and partition match
// with what we already have.

if (assigned->partitionId() != queueInfo.partitionId() ||
(assigned->key() != queueKey)) {
if (!delayed) {
// Leader is telling self node to map a queue to new
// partition or have a new key (basically, its a new
// incarnation of the queue). This could occur when a
// queue is being opened-closed-opened in very quick
// succession. Old instance of the queue is deleted by
// the primary, primary broadcasts queue-unasssignment
// advisory, leader broadcasts queue-assignment
// advisory for the new instance of the queue, but self
// node receives those 2 broadcasts out of order
// (leader's advisory followed by primary's advisory).
// In this case, its beneficial to force-update self's
// view of the queue with what the leader is
// advertising (with an error). When self receives
// queue-unassignment advisory from the primary for the
// old instance of the queue, it will log an error and
// ignore it.

BALL_LOG_ERROR
<< d_cluster_p->description() << ": "
<< "received queueAssignmentAdvisory from leader '"
<< source->nodeDescription() << "' for a known and"
<< " assigned queue with different "
<< "partitionId/key: [received: " << queueInfo
<< ", knownPartitionId: " << assigned->partitionId()
<< ", knownQueueKey: " << assigned->key() << "]";
}
else {
// Queue is assigned, and there is no partitionId/queueKey
// mismatch. So this assert should not fire.
BSLS_ASSERT_SAFE(1 ==
d_state_p->queueKeys().count(queueKey));
// There is partitionId/queueKey mismatch and this is a
// delayed (aka, buffered) advisory. This is a valid
// scenario. Here's how: Node starts up, initiates
// storage sync with the primary While recovery is
// underway, a queue, which is active, is deleted and
// unassigned by the primary. Further, same queue is
// opened again, which means leader may assign it to a
// different partition, and will definitely assign it a
// different queue key, and will issue a queue
// assignment advisory. But self will buffer it. When
// recovery is complete, self's storage manager will
// apply all recovered queues (including the previous
// incarnation of this queue) to self's cluster state
// (via 'ClusterStateManager::registerQueueInfo'), and
// thus, populate 'd_queues', and this is how we will
// end up here. So instead of alarming/asserting, we
// simply log at warn, and overwrite current state with
// the buffered (this) advisory and move on.

BALL_LOG_WARN
<< d_cluster_p->description()
<< ": overwriting current known queue state "
<< "with the buffered advisory for queue ["
<< assigned->uri() << "]. Current assigned Partition ["
<< assigned->partitionId() << "], current queueKey ["
<< assigned->key() << "], new Partition ["
<< queueInfo.partitionId() << "], new queueKey ["
<< queueKey << "].";
}

// Remove existing state, mapping, etc.

d_state_p->queueKeys().erase(assigned->key());
// no need to update d_state_p->domainStates() entry
// , queue was already known and registered
AppInfos appIdInfos(d_allocator_p);

mqbc::ClusterUtil::parseQueueInfo(&appIdInfos,
queueInfo,
d_allocator_p);

BSLA_MAYBE_UNUSED const bool rc = d_state_p->assignQueue(
uri,
queueKey,
queueInfo.partitionId(),
appIdInfos);
BSLS_ASSERT_SAFE(rc == false);
}
else {
// Queue is assigned, and there is no partitionId/queueKey
// mismatch. So this assert should not fire.
BSLS_ASSERT_SAFE(1 == d_state_p->queueKeys().count(queueKey));
}
}

if (!queueAlreadyAssigned) {
else {
AppInfos appIdInfos(d_allocator_p);

mqbc::ClusterUtil::parseQueueInfo(&appIdInfos,
Expand Down Expand Up @@ -1865,19 +1855,11 @@ void ClusterStateManager::processQueueUnAssignmentAdvisory(
mqbu::StorageKey key(mqbu::StorageKey::BinaryRepresentation(),
queueInfo.key().data());

bool hasQueue = true;
const DomainStatesCIter domCit = d_state_p->domainStates().find(
uri.qualifiedDomain());
if (domCit == d_state_p->domainStates().cend()) {
hasQueue = false;
}
const UriToQueueInfoMapCIter qcit = domCit->second->queuesInfo().find(
uri);
if (qcit == domCit->second->queuesInfo().cend()) {
hasQueue = false;
}
mqbc::ClusterStateQueueInfo* assigned = d_state_p->getAssigned(uri);
// Only Replica can `processQueueAssignmentAdvisory`. Therefore, the
// state cannot be `k_UNASSIGNING`

if (!hasQueue) {
if (assigned == 0) {
// Queue is not assigned. Error because it should not occur.

BALL_LOG_ERROR << d_cluster_p->description()
Expand All @@ -1892,8 +1874,8 @@ void ClusterStateManager::processQueueUnAssignmentAdvisory(
// Self node sees queue as assigned. Validate that the key/partition
// from the unassignment match the internal state.

if ((qcit->second->partitionId() != advisory.partitionId()) ||
(qcit->second->key() != key)) {
if ((assigned->partitionId() != advisory.partitionId()) ||
(assigned->key() != key)) {
// This can occur if a queue is deleted by the primary and created
// immediately by the client. Primary broadcasts queue
// unassignment advisory upon deleting old instance of the queue,
Expand All @@ -1915,8 +1897,8 @@ void ClusterStateManager::processQueueUnAssignmentAdvisory(
<< advisory.partitionId()
<< ", advisoryKey: " << key
<< ", internalPartitionId: "
<< qcit->second->partitionId()
<< ", internalKey: " << qcit->second->key() << "]";
<< assigned->partitionId()
<< ", internalKey: " << assigned->key() << "]";
continue; // CONTINUE
}
d_state_p->unassignQueue(uri);
Expand Down
8 changes: 5 additions & 3 deletions src/groups/mqb/mqbblp/mqbblp_domain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,15 @@ void Domain::updateAuthorizedAppIds(const AppInfos& addedAppIds,
}
}

void Domain::onQueueAssigned(const mqbc::ClusterStateQueueInfo& info)
void Domain::onQueueAssigned(
const bsl::shared_ptr<mqbc::ClusterStateQueueInfo>& info)
{
// executed by the associated CLUSTER's DISPATCHER thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(
d_cluster_sp->dispatcher()->inDispatcherThread(d_cluster_sp.get()));
BSLS_ASSERT_SAFE(info);

if (!d_cluster_sp->isCSLModeEnabled()) {
return; // RETURN
Expand All @@ -292,7 +294,7 @@ void Domain::onQueueAssigned(const mqbc::ClusterStateQueueInfo& info)
return; // RETURN
}

if (info.uri().domain() != d_name) {
if (info->uri().domain() != d_name) {
// Note: This method will fire on all domains which belong to the
// cluster having the queue assignment, but we examine the domain
// name from the 'uri' to guarantee that only one domain is
Expand All @@ -301,7 +303,7 @@ void Domain::onQueueAssigned(const mqbc::ClusterStateQueueInfo& info)
return; // RETURN
}

updateAuthorizedAppIds(info.appInfos());
updateAuthorizedAppIds(info->appInfos());
}

void Domain::onQueueUpdated(const bmqt::Uri& uri,
Expand Down
3 changes: 2 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_domain.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain,
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
void onQueueAssigned(const mqbc::ClusterStateQueueInfo& info)
void
onQueueAssigned(const bsl::shared_ptr<mqbc::ClusterStateQueueInfo>& info)
BSLS_KEYWORD_OVERRIDE;

/// Callback invoked when a queue with the specified `uri` belonging to
Expand Down
Loading

0 comments on commit 263600c

Please sign in to comment.