Skip to content

Commit

Permalink
Getting rid of (un)assigningCb
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Nov 10, 2024
1 parent 613c5ba commit d4ab844
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 512 deletions.
304 changes: 47 additions & 257 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp

Large diffs are not rendered by default.

45 changes: 13 additions & 32 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
/// Reset the `id`, `partitionId`, `key` and `queue` members of this
/// object. Note that `uri` is left untouched because it is an
/// invariant member of a given instance of such a QueueInfo object.
void reset();
void resetAndKeepPending();
};

struct StopContext {
Expand Down Expand Up @@ -522,29 +522,6 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
QueueAssignmentResult::Enum
assignQueue(const QueueContextSp& queueContext);

/// Called when the specified `uri` is in the process of being assigned.
/// If the specified `processingPendingRequests` is true, we will
/// process pending requests on this machine.
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
void onQueueAssigning(const bmqt::Uri& uri,
bool processingPendingRequests);

/// Called when the queue with the specified `queueInfo` is being
/// unassigned. Load into the specified `hasInFlightRequests` whether
/// there are still in-flight requests for the queue. Return true on
/// success, or false on failure.
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
///
/// TODO_CSL: This is the current workflow which we should be able to
/// remove after the new workflow via
/// ClusterQueueHelper::onQueueUnassigned() is stable.
bool onQueueUnassigning(bool* hasInFlightRequests,
const bmqp_ctrlmsg::QueueInfo& queueInfo);

/// Send a queueAssignment request to the leader, requesting assignment
/// of the queue with the specified `uri`. This method is called only
/// on a non leader node of a cluster member, for a cluster having a
Expand Down Expand Up @@ -932,6 +909,10 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
/// assigned.
bool isQueueAssigned(const QueueContext& queueContext) const;

/// Return true if the queue in the specified `queueContext` is
/// pending unassignment (async in CSL).
bool isQueuePendingUnassignment(const QueueContext& queueContext) const;

/// Return true if the queue in the specified `queueContext` is assigned
/// and its associated primary is AVAILABLE and is different from the
/// optionally specified `otherThan`.
Expand Down Expand Up @@ -1108,13 +1089,6 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
/// failure.
int gcExpiredQueues(bool immediate = false);

ClusterQueueHelper& setOnQueueAssignedCb(const OnQueueAssignedCb& value);

/// Set the corresponding member to the specified `value` and return a
/// reference offering modifiable access to this object.
ClusterQueueHelper&
setOnQueueUnassignedCb(const OnQueueUnassignedCb& value);

/// Start executing multi-step processing of StopRequest or CLOSING node
/// advisory received from the specified `clusterNode`. In the case of
/// StopRequest the specified `request` references the request; in the
Expand All @@ -1134,8 +1108,8 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
mqbc::ClusterNodeSession* ns,
const VoidFunctor& callback = VoidFunctor());

/// Called upon leader becoming available.
void onLeaderAvailable();
// Called upon leader becoming available.

// ACCESSORS

Expand Down Expand Up @@ -1283,6 +1257,13 @@ ClusterQueueHelper::isQueueAssigned(const QueueContext& queueContext) const
return true;
}

inline bool ClusterQueueHelper::isQueuePendingUnassignment(
const QueueContext& queueContext) const
{
const ClusterStateQueueInfoCSp& state = queueContext.d_stateQInfo_sp;
return state ? state->pendingUnassignment() : false;
}

inline bool ClusterQueueHelper::isQueuePrimaryAvailable(
const QueueContext& queueContext,
mqbnet::ClusterNode* otherThan) const
Expand Down
89 changes: 20 additions & 69 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,6 @@ ClusterStateManager::assignQueue(const bmqt::Uri& uri,
d_clusterStateLedger_mp.get(),
d_cluster_p,
uri,
d_queueAssigningCb,
d_allocator_p,
status);
}
Expand All @@ -1189,7 +1188,6 @@ void ClusterStateManager::registerQueueInfo(const bmqt::Uri& uri,
partitionId,
queueKey,
appIdInfos,
d_queueAssigningCb,
forceUpdate);
}

Expand All @@ -1213,6 +1211,25 @@ void ClusterStateManager::unassignQueue(
<< ": Failed to apply queue unassignment advisory: "
<< advisory << ", rc: " << rc;
}
else {
// In non-CSL mode this is the shortcut to call Primary CQH instead of
// waiting for the quorum of acks in the ledger.
for (bsl::vector<bmqp_ctrlmsg::QueueInfo>::const_iterator cit =
advisory.queues().begin();
cit != advisory.queues().end();
++cit) {
const bmqp_ctrlmsg::QueueInfo& queueInfo = *cit;

if (d_state_p->unassignQueue(queueInfo.uri())) {
BALL_LOG_INFO << d_clusterData_p->identity().description()
<< ": Queue unassigned: " << queueInfo;
}
else {
BALL_LOG_INFO << d_clusterData_p->identity().description()
<< ": Failed to unassign Queue: " << queueInfo;
}
}
}
}

void ClusterStateManager::sendClusterState(
Expand Down Expand Up @@ -1462,7 +1479,6 @@ void ClusterStateManager::processQueueAssignmentRequest(
d_cluster_p,
request,
requester,
d_queueAssigningCb,
d_allocator_p);
}

Expand Down Expand Up @@ -1663,25 +1679,6 @@ void ClusterStateManager::processQueueAssignmentAdvisory(

d_state_p->queueKeys().erase(qcit->second->key());

mqbc::ClusterState::QueueKeysInsertRc irc =
d_state_p->queueKeys().insert(queueKey);
if (false == irc.second) {
// QueueKey provided by the leader is not unique. This
// is bad, as thing means that 2 different queue URIs
// have queue key. Unfortunately we can't retrieve the
// URI of the 'other' queue.

BMQTSK_ALARMLOG_ALARM("CLUSTER_STATE")
<< d_cluster_p->description()
<< ": queueKey clash while applying"
<< (delayed ? " buffered " : " ")
<< "queue assignment advisory: " << queueInfo
<< ". QueueKey [" << queueKey
<< "]. Ignoring this entry in the advisory msg."
<< BMQTSK_ALARMLOG_END;
continue; // CONTINUE
}

// no need to update d_state_p->domainStates() entry
// , queue was already known and registered
AppInfos appIdInfos(d_allocator_p);
Expand All @@ -1707,27 +1704,6 @@ void ClusterStateManager::processQueueAssignmentAdvisory(
}

if (!queueAlreadyAssigned) {
// Since self node doesn't see the queue as assigned, the
// queueKey specified in the advisory must not occur in
// 'queueKeys' data structure.
mqbc::ClusterState::QueueKeysInsertRc insertRc =
d_state_p->queueKeys().insert(queueKey);

if (false == insertRc.second) {
// QueueKey is not unique.

BMQTSK_ALARMLOG_ALARM("CLUSTER_STATE")
<< d_cluster_p->description() << ": attemping to apply"
<< (delayed ? " buffered " : " ")
<< " queueAssignmentAdvisory from leader ["
<< source->nodeDescription() << "] for an unknown queue ["
<< uri << "] assigned to Partition ["
<< queueInfo.partitionId() << "], but queueKey ["
<< queueKey << "] is not unique. Ignoring this entry in "
<< "the advisory." << BMQTSK_ALARMLOG_END;
continue; // CONTINUE
}

AppInfos appIdInfos(d_allocator_p);

mqbc::ClusterUtil::parseQueueInfo(&appIdInfos,
Expand All @@ -1738,16 +1714,10 @@ void ClusterStateManager::processQueueAssignmentAdvisory(
queueKey,
queueInfo.partitionId(),
appIdInfos);

d_state_p->domainStates()
.at(uri.qualifiedDomain())
->adjustQueueCount(1);
}

BALL_LOG_INFO << d_cluster_p->description()
<< ": Queue assigned: " << queueInfo;

d_queueAssigningCb(uri, true); // processingPendingRequests
}
}

Expand Down Expand Up @@ -1949,26 +1919,7 @@ void ClusterStateManager::processQueueUnAssignmentAdvisory(
<< ", internalKey: " << qcit->second->key() << "]";
continue; // CONTINUE
}

bool hasInFlightRequests = false;
if (d_queueUnassigningCb(&hasInFlightRequests, queueInfo)) {
const mqbu::StorageKey queueKey = qcit->second->key();
if (hasInFlightRequests) {
d_state_p->updatePartitionQueueMapped(queueInfo.partitionId(),
-1);
}
else {
d_state_p->unassignQueue(uri);
}

d_state_p->queueKeys().erase(queueKey);
d_state_p->domainStates()
.at(uri.qualifiedDomain())
->adjustQueueCount(-1);

BALL_LOG_INFO << d_cluster_p->description()
<< ": Unassigned queue: " << queueInfo;
}
d_state_p->unassignQueue(uri);
}
}

Expand Down
24 changes: 0 additions & 24 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,6 @@ class ClusterStateManager : public mqbc::ClusterStateObserver,
// A queue advisory can be either an
// assignment or an un-assignment msg.

QueueAssigningCb d_queueAssigningCb;

QueueUnassigningCb d_queueUnassigningCb;

AfterPartitionPrimaryAssignmentCb d_afterPartitionPrimaryAssignmentCb;

private:
Expand Down Expand Up @@ -303,14 +299,6 @@ class ClusterStateManager : public mqbc::ClusterStateObserver,
virtual void
setStorageManager(mqbi::StorageManager* value) BSLS_KEYWORD_OVERRIDE;

/// Set the queue assigning callback to the specified `value`.
virtual void
setQueueAssigningCb(const QueueAssigningCb& value) BSLS_KEYWORD_OVERRIDE;

virtual void setQueueUnassigningCb(const QueueUnassigningCb& value)
BSLS_KEYWORD_OVERRIDE;
// Set the queue unassigning callback to the specified 'value'.

virtual void setAfterPartitionPrimaryAssignmentCb(
const AfterPartitionPrimaryAssignmentCb& value) BSLS_KEYWORD_OVERRIDE;
// Set the after partition primary assignment callback to the specified
Expand Down Expand Up @@ -647,18 +635,6 @@ inline void ClusterStateManager::setStorageManager(mqbi::StorageManager* value)
d_storageManager_p = value;
}

inline void
ClusterStateManager::setQueueAssigningCb(const QueueAssigningCb& value)
{
d_queueAssigningCb = value;
}

inline void
ClusterStateManager::setQueueUnassigningCb(const QueueUnassigningCb& value)
{
d_queueUnassigningCb = value;
}

inline void ClusterStateManager::setAfterPartitionPrimaryAssignmentCb(
const AfterPartitionPrimaryAssignmentCb& value)
{
Expand Down
1 change: 1 addition & 0 deletions src/groups/mqb/mqbc/mqbc_clusterstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ bool ClusterState::assignQueue(const bmqt::Uri& uri,
updatePartitionQueueMapped(iter->second->partitionId(), -1);
iter->second->setKey(key).setPartitionId(partitionId);
iter->second->appInfos() = appIdInfos;
// TODO: in what scenario 'pendingUnassignment() == true'?
iter->second->setPendingUnassignment(false);
}
}
Expand Down
4 changes: 0 additions & 4 deletions src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ ClusterStateManager::ClusterStateManager(
// TODO Add cluster config to determine Eventual vs Strong
, d_clusterStateLedger_mp(clusterStateLedger)
, d_storageManager_p(0)
, d_queueAssigningCb()
, d_afterPartitionPrimaryAssignmentCb()
{
// PRECONDITIONS
Expand Down Expand Up @@ -1501,7 +1500,6 @@ ClusterStateManager::assignQueue(const bmqt::Uri& uri,
d_clusterStateLedger_mp.get(),
d_cluster_p,
uri,
d_queueAssigningCb,
d_allocator_p,
status);
}
Expand All @@ -1523,7 +1521,6 @@ void ClusterStateManager::registerQueueInfo(const bmqt::Uri& uri,
partitionId,
queueKey,
appIdInfos,
d_queueAssigningCb,
forceUpdate);
}

Expand Down Expand Up @@ -1810,7 +1807,6 @@ void ClusterStateManager::processQueueAssignmentRequest(
d_cluster_p,
request,
requester,
d_queueAssigningCb,
d_allocator_p);
}

Expand Down
25 changes: 0 additions & 25 deletions src/groups/mqb/mqbc/mqbc_clusterstatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ class ClusterStateManager

mqbi::StorageManager* d_storageManager_p;

QueueAssigningCb d_queueAssigningCb;

AfterPartitionPrimaryAssignmentCb d_afterPartitionPrimaryAssignmentCb;

private:
Expand Down Expand Up @@ -406,14 +404,6 @@ class ClusterStateManager
virtual void
setStorageManager(mqbi::StorageManager* value) BSLS_KEYWORD_OVERRIDE;

/// Set the queue assigning callback to the specified `value`.
virtual void
setQueueAssigningCb(const QueueAssigningCb& value) BSLS_KEYWORD_OVERRIDE;

virtual void setQueueUnassigningCb(const QueueUnassigningCb& value)
BSLS_KEYWORD_OVERRIDE;
// Set the queue unassigning callback to the specified 'value'.

/// Set the after partition primary assignment callback to the specified
/// `value`.
virtual void setAfterPartitionPrimaryAssignmentCb(
Expand Down Expand Up @@ -764,21 +754,6 @@ inline void ClusterStateManager::setStorageManager(mqbi::StorageManager* value)
d_storageManager_p = value;
}

inline void
ClusterStateManager::setQueueAssigningCb(const QueueAssigningCb& value)
{
d_queueAssigningCb = value;
}

inline void ClusterStateManager::setQueueUnassigningCb(
BSLS_ANNOTATION_UNUSED const QueueUnassigningCb& value)
{
// Note that QueueUnassigningCb is only ever used in non-CSL mode, so we
// can ignore it.
//
// NOTHING
}

inline void ClusterStateManager::setAfterPartitionPrimaryAssignmentCb(
const AfterPartitionPrimaryAssignmentCb& value)
{
Expand Down
Loading

0 comments on commit d4ab844

Please sign in to comment.