Skip to content

Commit

Permalink
Handle appKey conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Dec 4, 2024
1 parent 94d9784 commit 87ab602
Show file tree
Hide file tree
Showing 19 changed files with 198 additions and 142 deletions.
7 changes: 3 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_queuestate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,12 @@ void QueueState::loadInternals(mqbcmd::QueueState* out) const
appIdKeyPairs.cbegin();
cit != appIdKeyPairs.cend();
++cit, ++i) {
const mqbi::Storage::AppInfo& p = *cit;
virtualStorages[i].appId() = p.first;
virtualStorages[i].appId() = cit->first;
os.reset();
os << p.second;
os << cit->second;
virtualStorages[i].appKey() = os.str();
virtualStorages[i].numMessages() = d_storage_mp->numMessages(
p.second);
cit->second);
}
}
}
Expand Down
8 changes: 3 additions & 5 deletions src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1604,16 +1604,14 @@ void RelayQueueEngine::loadInternals(mqbcmd::QueueEngine* out) const
appIdKeyPairs.cbegin();
cit != appIdKeyPairs.cend();
++cit) {
const mqbi::Storage::AppInfo& p = *cit;

subStreams.resize(subStreams.size() + 1);
mqbcmd::RelayQueueEngineSubStream& subStream = subStreams.back();
subStream.appId() = p.first;
subStream.appId() = cit->first;
bmqu::MemOutStream appKey;
appKey << p.second;
appKey << cit->second;
subStream.appKey() = appKey.str();
subStream.numMessages() = d_queueState_p->storage()->numMessages(
p.second);
cit->second);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbc/mqbc_clusterstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ int ClusterState::updateQueue(const bmqt::Uri& uri,
for (AppInfosCIter citer = removedAppIds.begin();
citer != removedAppIds.end();
++citer) {
const AppInfosCIter appIdInfoCIter = appIdInfos.find(*citer);
const AppInfosCIter appIdInfoCIter = appIdInfos.find(citer->first);
if (appIdInfoCIter == appIdInfos.cend()) {
return rc_APPID_NOT_FOUND; // RETURN
}
Expand Down
13 changes: 6 additions & 7 deletions src/groups/mqb/mqbc/mqbc_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3641,7 +3641,7 @@ void StorageManager::initializeQueueKeyInfoMap(
for (AppInfosCIter appIdCit = csQinfo.appInfos().cbegin();
appIdCit != csQinfo.appInfos().cend();
++appIdCit) {
qinfo.addAppInfo(*appIdCit);
qinfo.addAppInfo(appIdCit);
}

d_queueKeyInfoMapVec.at(csQinfo.partitionId())
Expand All @@ -3652,12 +3652,11 @@ void StorageManager::initializeQueueKeyInfoMap(
d_isQueueKeyInfoMapVecInitialized = true;
}

void StorageManager::registerQueue(
const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const bsl::unordered_set<AppInfo>& appIdKeyPairs,
mqbi::Domain* domain)
void StorageManager::registerQueue(const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const AppInfos& appIdKeyPairs,
mqbi::Domain* domain)
{
// executed by the *CLUSTER DISPATCHER* thread

Expand Down
11 changes: 5 additions & 6 deletions src/groups/mqb/mqbc/mqbc_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -817,12 +817,11 @@ class StorageManager
/// associated queue storage created.
///
/// THREAD: Executed by the Client's dispatcher thread.
virtual void
registerQueue(const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const bsl::unordered_set<AppInfo>& appIdKeyPairs,
mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE;
virtual void registerQueue(const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const AppInfos& appIdKeyPairs,
mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE;

/// Synchronously unregister the queue with the specified `uri` from the
/// specified `partitionId`.
Expand Down
138 changes: 116 additions & 22 deletions src/groups/mqb/mqbc/mqbc_storageutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,86 @@ void optionalSemaphorePost(bslmt::Semaphore* semaphore)
// ------------------

// PRIVATE FUNCTIONS

bool StorageUtil::loadDifference(mqbi::Storage::AppInfos* result,
const mqbi::Storage::AppInfos& baseSet,
const mqbi::Storage::AppInfos& subtractionSet,
bool findConflicts)
{
bool noConflicts = true;
for (mqbi::Storage::AppInfos::const_iterator cit = baseSet.cbegin();
cit != baseSet.cend();
++cit) {
mqbi::Storage::AppInfos::const_iterator match = subtractionSet.find(
cit->first);

if (subtractionSet.end() == match) {
result->emplace(cit->first, cit->second);
}
else if (findConflicts && match->second != cit->second) {
BALL_LOG_ERROR << "appId [" << cit->first
<< "] has conflicting appKeys [" << cit->second
<< " vs " << match->second << "]. Ignoring ["
<< cit->second << "]";
noConflicts = false;
}
}

return noConflicts;
}

void StorageUtil::loadDifference(
bsl::unordered_set<bsl::string>* result,
const bsl::unordered_set<bsl::string>& baseSet,
const bsl::unordered_set<bsl::string>& subtractionSet)
{
for (bsl::unordered_set<bsl::string>::const_iterator cit =
baseSet.cbegin();
cit != baseSet.cend();
++cit) {
if (subtractionSet.end() == subtractionSet.find(*cit)) {
result->emplace(*cit);
}
}
}

bool StorageUtil::loadAddedAndRemovedEntries(
mqbi::Storage::AppInfos* addedEntries,
mqbi::Storage::AppInfos* removedEntries,
const mqbi::Storage::AppInfos& existingEntries,
const mqbi::Storage::AppInfos& newEntries)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(addedEntries);
BSLS_ASSERT_SAFE(removedEntries);

// Find newly added entries.
bool noConflicts =
loadDifference(addedEntries, newEntries, existingEntries, true);

// Find removed entries.
loadDifference(removedEntries, existingEntries, newEntries);

return noConflicts;
}

void StorageUtil::loadAddedAndRemovedEntries(
bsl::unordered_set<bsl::string>* addedEntries,
bsl::unordered_set<bsl::string>* removedEntries,
const bsl::unordered_set<bsl::string>& existingEntries,
const bsl::unordered_set<bsl::string>& newEntries)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(addedEntries);
BSLS_ASSERT_SAFE(removedEntries);

// Find newly added entries.
loadDifference(addedEntries, newEntries, existingEntries);

// Find removed entries.
loadDifference(removedEntries, existingEntries, newEntries);
}

bool StorageUtil::loadUpdatedAppInfos(AppInfos* addedAppInfos,
AppInfos* removedAppInfos,
const mqbs::ReplicatedStorage& storage,
Expand Down Expand Up @@ -109,6 +189,9 @@ bool StorageUtil::loadUpdatedAppInfos(AppInfos* addedAppInfos,
existingAppInfos,
newAppInfos);

// TEMPORARY: if duplicate AppKey values exist for the same AppId, ignore
// the one in 'newAppInfos'.

if (addedAppInfos->empty() && removedAppInfos->empty()) {
// No appIds to add or remove.
return false; // RETURN
Expand Down Expand Up @@ -184,8 +267,7 @@ void StorageUtil::updateQueuePrimaryDispatched(
mqbs::FileStore* fs,
const bsl::string& clusterDescription,
int partitionId,
const AppInfos& addedIdKeyPairs,
const AppInfos& removedIdKeyPairs,
const AppInfos& appIdKeyPairs,
bool isFanout)
{
// executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId'
Expand All @@ -198,13 +280,33 @@ void StorageUtil::updateQueuePrimaryDispatched(

bslmt::LockGuard<bslmt::Mutex> guard(storagesLock); // LOCK

AppInfos existingAppInfos;
storage->loadVirtualStorageDetails(&existingAppInfos);

bmqu::Printer<AppInfos> printer2(&existingAppInfos);

BALL_LOG_INFO << clusterDescription << " Partition [" << partitionId
<< "]: Existing queue '" << storage->queueUri()
<< "', queueKey: '" << storage->queueKey() << "' "
<< printer2 << " in the storage.";

AppInfos addedAppInfos, removedAppInfos;

bool hasUpdate = loadUpdatedAppInfos(&addedAppInfos,
&removedAppInfos,
*storage,
appIdKeyPairs);
if (!hasUpdate) {
// No update needed for AppId/Key pairs.
return; // RETURN
}
// Simply forward to 'updateQueuePrimaryRaw'.
updateQueuePrimaryRaw(storage,
fs,
clusterDescription,
partitionId,
addedIdKeyPairs,
removedIdKeyPairs,
addedAppInfos,
removedAppInfos,
isFanout);
}

Expand Down Expand Up @@ -1401,9 +1503,7 @@ void StorageUtil::recoveredQueuesCb(
for (AppInfos::const_iterator cit = qinfo.appIdKeyPairs().cbegin();
cit != qinfo.appIdKeyPairs().cend();
++cit) {
const AppInfo& p = *cit;

AppIdsInsertRc appIdsIrc = appIds.insert(p.first);
AppIdsInsertRc appIdsIrc = appIds.insert(cit->first);
if (false == appIdsIrc.second) {
// Duplicate AppId.

Expand All @@ -1413,7 +1513,7 @@ void StorageUtil::recoveredQueuesCb(
<< "encountered a duplicate AppId while processing "
<< "recovered queue [" << uri << "], " << "queueKey ["
<< qit->first << "]. AppId [" << *(appIdsIrc.first)
<< "]. AppKey [" << p.second << "]."
<< "]. AppKey [" << cit->second << "]."
<< BMQTSK_ALARMLOG_END;
mqbu::ExitUtil::terminate(
mqbu::ExitCode::e_RECOVERY_FAILURE);
Expand Down Expand Up @@ -2225,6 +2325,12 @@ void StorageUtil::registerQueue(
const mqbconfm::StorageDefinition& storageDef = domain->config().storage();
const mqbconfm::QueueMode& queueMode = domain->config().mode();

bmqu::Printer<AppInfos> printer1(&appIdKeyPairs);

BALL_LOG_INFO << clusterDescription << " Partition [" << partitionId
<< "]: Registering queue '" << uri << "', queueKey: '"
<< queueKey << "' " << printer1 << " to the storage.";

if (queueMode.isUndefinedValue()) {
BMQTSK_ALARMLOG_ALARM("STORAGE")
<< "Partition [" << partitionId
Expand Down Expand Up @@ -2280,18 +2386,7 @@ void StorageUtil::registerQueue(
// to be added or removed (see comments at the beginning of this
// routine for explanation).

AppInfos addedAppInfos, removedAppInfos;

bool hasUpdate = loadUpdatedAppInfos(&addedAppInfos,
&removedAppInfos,
*storageSp.get(),
appIdKeyPairs);
if (!hasUpdate) {
// No update needed for AppId/Key pairs.
return; // RETURN
}

// Some AppId/Key pairs need to be updated. Invoke
// Invoke
// 'updateQueuePrimaryDispatched' in the right thread to carry out
// the addition/removal of those pairs.

Expand All @@ -2308,8 +2403,7 @@ void StorageUtil::registerQueue(
fs,
clusterDescription,
partitionId,
addedAppInfos,
removedAppInfos,
appIdKeyPairs,
domain->config().mode().isFanoutValue()));

dispatcher->dispatchEvent(queueEvent,
Expand Down
Loading

0 comments on commit 87ab602

Please sign in to comment.