Skip to content

Commit

Permalink
fix: generate AppKey only once (#469)
Browse files Browse the repository at this point in the history
* fix: generate AppKey only once

Signed-off-by: dorjesinpo <[email protected]>

* switching to CSL app key gen everywhere

Signed-off-by: dorjesinpo <[email protected]>

* more fixes and an IT

Signed-off-by: dorjesinpo <[email protected]>

---------

Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo authored Oct 28, 2024
1 parent 8550a81 commit afdbba2
Show file tree
Hide file tree
Showing 50 changed files with 622 additions and 717 deletions.
49 changes: 21 additions & 28 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2209,27 +2209,16 @@ void Cluster::onRecoveryStatusDispatched(
const bmqt::Uri uri(itMp->uri().canonical());
BSLS_ASSERT_SAFE(itMp->storage()->partitionId() ==
static_cast<int>(pid));
if (isCSLModeEnabled()) {
AppIdKeyPairs appIdKeyPairs;
itMp->storage()->loadVirtualStorageDetails(&appIdKeyPairs);
AppIdInfos appIdInfos(appIdKeyPairs.cbegin(),
appIdKeyPairs.cend());

d_clusterOrchestrator.registerQueueInfo(
uri,
pid,
itMp->storage()->queueKey(),
appIdInfos,
false); // Force-update?
}
else {
d_clusterOrchestrator.registerQueueInfo(
uri,
pid,
itMp->storage()->queueKey(),
AppIdInfos(),
false); // Force-update?
}

AppInfos appIdInfos;
itMp->storage()->loadVirtualStorageDetails(&appIdInfos);

d_clusterOrchestrator.registerQueueInfo(
uri,
pid,
itMp->storage()->queueKey(),
appIdInfos,
false); // Force-update?

++(*itMp);
}
Expand Down Expand Up @@ -2844,18 +2833,22 @@ void Cluster::onDomainReconfigured(const mqbi::Domain& domain,
}

// Compute list of added and removed App IDs.
bsl::vector<bsl::string> oldCfgAppIds(oldDefn.mode().fanout().appIDs(),
d_allocator_p);
bsl::vector<bsl::string> newCfgAppIds(newDefn.mode().fanout().appIDs(),
d_allocator_p);

bsl::vector<bsl::string> addedIds, removedIds;
bsl::unordered_set<bsl::string> oldCfgAppIds(
oldDefn.mode().fanout().appIDs().cbegin(),
oldDefn.mode().fanout().appIDs().cend(),
d_allocator_p);
bsl::unordered_set<bsl::string> newCfgAppIds(
newDefn.mode().fanout().appIDs().cbegin(),
newDefn.mode().fanout().appIDs().cend(),
d_allocator_p);

bsl::unordered_set<bsl::string> addedIds, removedIds;
mqbc::StorageUtil::loadAddedAndRemovedEntries(&addedIds,
&removedIds,
oldCfgAppIds,
newCfgAppIds);

bsl::vector<bsl::string>::const_iterator it = addedIds.begin();
bsl::unordered_set<bsl::string>::const_iterator it = addedIds.cbegin();
for (; it != addedIds.cend(); ++it) {
dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterOrchestrator::registerAppId,
Expand Down
4 changes: 1 addition & 3 deletions src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,9 @@ class Cluster : public mqbi::Cluster,

private:
// PRIVATE TYPES
typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs;

typedef mqbc::ClusterStatePartitionInfo ClusterStatePartitionInfo;

typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos;
typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos;

typedef mqbc::ClusterMembership::ClusterNodeSessionSp ClusterNodeSessionSp;

Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ void ClusterOrchestrator::processBufferedQueueAdvisories()
void ClusterOrchestrator::registerQueueInfo(const bmqt::Uri& uri,
int partitionId,
const mqbu::StorageKey& queueKey,
const AppIdInfos& appIdInfos,
const AppInfos& appIdInfos,
bool forceUpdate)
{
// executed by the *DISPATCHER* thread
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class ClusterOrchestrator {

typedef bdlmt::EventScheduler::RecurringEventHandle RecurringEventHandle;

typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos;
typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos;

private:
// DATA
Expand Down Expand Up @@ -515,7 +515,7 @@ class ClusterOrchestrator {
void registerQueueInfo(const bmqt::Uri& uri,
int partitionId,
const mqbu::StorageKey& queueKey,
const AppIdInfos& appIdInfos,
const AppInfos& appIdInfos,
bool forceUpdate);

/// Executed by any thread.
Expand Down
96 changes: 34 additions & 62 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,29 +139,29 @@ void createQueueUriKey(bmqt::Uri* out,
}

void afterAppIdRegisteredDispatched(
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppIdInfo& appIdInfo)
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

queue->queueEngine()->afterAppIdRegistered(
mqbi::Storage::AppIdKeyPair(appIdInfo.first, appIdInfo.second));
mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second));
}

void afterAppIdUnregisteredDispatched(
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppIdInfo& appIdInfo)
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

queue->queueEngine()->afterAppIdUnregistered(
mqbi::Storage::AppIdKeyPair(appIdInfo.first, appIdInfo.second));
mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second));
}

void handleHolderDummy(const bsl::shared_ptr<mqbi::QueueHandle>& handle)
Expand Down Expand Up @@ -2152,27 +2152,15 @@ bsl::shared_ptr<mqbi::Queue> ClusterQueueHelper::createQueueFactory(
// queue but the queue is never opened, it will not be registered with
// the StorageMgr. This is ok.

if (d_cluster_p->isCSLModeEnabled()) {
const AppIdInfos& appIdInfos =
context.d_queueContext_p->d_stateQInfo_sp->appIdInfos();
const mqbi::Storage::AppIdKeyPairs appIdKeyPairs(
appIdInfos.cbegin(),
appIdInfos.cend());
d_storageManager_p->registerQueue(
context.d_queueContext_p->uri(),
context.d_queueContext_p->key(),
context.d_queueContext_p->partitionId(),
appIdKeyPairs,
context.d_domain_p);
}
else {
d_storageManager_p->registerQueue(
context.d_queueContext_p->uri(),
context.d_queueContext_p->key(),
context.d_queueContext_p->partitionId(),
mqbi::Storage::AppIdKeyPairs(),
context.d_domain_p);
}
// Use keys in the CSL instead of generating new ones to keep CSL and
// non-CSL consistent.

d_storageManager_p->registerQueue(
context.d_queueContext_p->uri(),
context.d_queueContext_p->key(),
context.d_queueContext_p->partitionId(),
context.d_queueContext_p->d_stateQInfo_sp->appInfos(),
context.d_domain_p);

// Queue must have been registered with storage manager before
// registering it with the domain, otherwise Queue.configure() will
Expand Down Expand Up @@ -3698,28 +3686,12 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId)
// node creates a local queue instance (see
// 'createQueueFactory').

if (d_cluster_p->isCSLModeEnabled()) {
const AppIdInfos& appIdInfos =
queueContext->d_stateQInfo_sp->appIdInfos();
const mqbi::Storage::AppIdKeyPairs appIdKeyPairs(
appIdInfos.cbegin(),
appIdInfos.cend());

d_storageManager_p->registerQueue(
queueContext->uri(),
queueContext->key(),
queueContext->partitionId(),
appIdKeyPairs,
qinfo.d_queue_sp->domain());
}
else {
d_storageManager_p->registerQueue(
queueContext->uri(),
queueContext->key(),
queueContext->partitionId(),
mqbi::Storage::AppIdKeyPairs(),
qinfo.d_queue_sp->domain());
}
d_storageManager_p->registerQueue(
queueContext->uri(),
queueContext->key(),
queueContext->partitionId(),
queueContext->d_stateQInfo_sp->appInfos(),
qinfo.d_queue_sp->domain());

// Convert the queue from remote to local instance.
queueContext->d_liveQInfo.d_queue_sp->convertToLocal();
Expand Down Expand Up @@ -4109,6 +4081,7 @@ void ClusterQueueHelper::onQueueAssigned(
BSLS_ASSERT_SAFE(!d_cluster_p->isRemote());

if (!d_cluster_p->isCSLModeEnabled()) {
// REVISIT
return; // RETURN
}

Expand Down Expand Up @@ -4229,14 +4202,11 @@ void ClusterQueueHelper::onQueueAssigned(
->domain(),
true); // allowDuplicate

const mqbi::Storage::AppIdKeyPairs appIdKeyPairs(
info.appIdInfos().cbegin(),
info.appIdInfos().cend());
d_storageManager_p->updateQueueReplica(
info.partitionId(),
info.uri(),
info.key(),
appIdKeyPairs,
info.appInfos(),
d_clusterState_p->domainStates()
.at(info.uri().qualifiedDomain())
->domain(),
Expand Down Expand Up @@ -4395,8 +4365,8 @@ void ClusterQueueHelper::onQueueUnassigned(

void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
const bsl::string& domain,
const AppIdInfos& addedAppIds,
const AppIdInfos& removedAppIds)
const AppInfos& addedAppIds,
const AppInfos& removedAppIds)
{
// executed by the cluster *DISPATCHER* thread

Expand Down Expand Up @@ -4424,19 +4394,21 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
const int partitionId = qiter->second->partitionId();
BSLS_ASSERT_SAFE(partitionId != mqbs::DataStore::k_INVALID_PARTITION_ID);

for (AppIdInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend();
for (AppInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend();
++cit) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue creation callback is
// invoked at replica nodes when they receive a queue creation
// record from the primary in the partition stream.
mqbi::Storage::AppIdKeyPair appIdKeyPair(cit->first, cit->second);
mqbi::Storage::AppIdKeyPairs appIdKeyPairs(1, appIdKeyPair);

mqbi::Storage::AppInfos one(1, d_allocator_p);
one.emplace(*cit);

d_storageManager_p->updateQueueReplica(
partitionId,
uri,
qiter->second->key(),
appIdKeyPairs,
one,
d_clusterState_p->domainStates()
.at(uri.qualifiedDomain())
->domain());
Expand All @@ -4450,7 +4422,7 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
}
}

for (AppIdInfosCIter cit = removedAppIds.cbegin();
for (AppInfosCIter cit = removedAppIds.cbegin();
cit != removedAppIds.cend();
++cit) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
Expand All @@ -4471,8 +4443,8 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
}
}

bmqu::Printer<AppIdInfos> printer1(&addedAppIds);
bmqu::Printer<AppIdInfos> printer2(&removedAppIds);
bmqu::Printer<AppInfos> printer1(&addedAppIds);
bmqu::Printer<AppInfos> printer2(&removedAppIds);
BALL_LOG_INFO << d_cluster_p->description() << ": Updated queue: " << uri
<< ", addedAppIds: " << printer1
<< ", removedAppIds: " << printer2;
Expand Down
8 changes: 4 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,9 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
/// queue which have a proper valid unique queueId.
typedef bsl::unordered_map<int, QueueContext*> QueueContextByIdMap;

typedef AppIdInfos::const_iterator AppIdInfosCIter;
typedef AppInfos::const_iterator AppInfosCIter;

typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos;
typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos;

private:
// DATA
Expand Down Expand Up @@ -997,8 +997,8 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
/// dispatcher thread.
virtual void onQueueUpdated(const bmqt::Uri& uri,
const bsl::string& domain,
const AppIdInfos& addedAppIds,
const AppIdInfos& removedAppIds = AppIdInfos())
const AppInfos& addedAppIds,
const AppInfos& removedAppIds = AppInfos())
BSLS_KEYWORD_OVERRIDE;

private:
Expand Down
Loading

0 comments on commit afdbba2

Please sign in to comment.