Skip to content

Commit

Permalink
Fix: handle appKey conflicts (#536)
Browse files Browse the repository at this point in the history
* Handle appKey conflicts

Signed-off-by: dorjesinpo <[email protected]>

* Addressing review

Signed-off-by: dorjesinpo <[email protected]>

---------

Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo authored Dec 11, 2024
1 parent 0d28d61 commit fae5ec1
Show file tree
Hide file tree
Showing 20 changed files with 210 additions and 157 deletions.
3 changes: 3 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2210,6 +2210,9 @@ void Cluster::onRecoveryStatusDispatched(
BSLS_ASSERT_SAFE(itMp->storage()->partitionId() ==
static_cast<int>(pid));

// TODO: wrong thread to call 'loadVirtualStorageDetails'
// but 'onRecoveryStatusDispatched' should not be concurrent
// with any of 'add/removeVirtualStorage' calls.
AppInfos appIdInfos;
itMp->storage()->loadVirtualStorageDetails(&appIdInfos);

Expand Down
7 changes: 3 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_queuestate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,12 @@ void QueueState::loadInternals(mqbcmd::QueueState* out) const
appIdKeyPairs.cbegin();
cit != appIdKeyPairs.cend();
++cit, ++i) {
const mqbi::Storage::AppInfo& p = *cit;
virtualStorages[i].appId() = p.first;
virtualStorages[i].appId() = cit->first;
os.reset();
os << p.second;
os << cit->second;
virtualStorages[i].appKey() = os.str();
virtualStorages[i].numMessages() = d_storage_mp->numMessages(
p.second);
cit->second);
}
}
}
Expand Down
8 changes: 3 additions & 5 deletions src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1604,16 +1604,14 @@ void RelayQueueEngine::loadInternals(mqbcmd::QueueEngine* out) const
appIdKeyPairs.cbegin();
cit != appIdKeyPairs.cend();
++cit) {
const mqbi::Storage::AppInfo& p = *cit;

subStreams.resize(subStreams.size() + 1);
mqbcmd::RelayQueueEngineSubStream& subStream = subStreams.back();
subStream.appId() = p.first;
subStream.appId() = cit->first;
bmqu::MemOutStream appKey;
appKey << p.second;
appKey << cit->second;
subStream.appKey() = appKey.str();
subStream.numMessages() = d_queueState_p->storage()->numMessages(
p.second);
cit->second);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbc/mqbc_clusterstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ int ClusterState::updateQueue(const bmqt::Uri& uri,
for (AppInfosCIter citer = removedAppIds.begin();
citer != removedAppIds.end();
++citer) {
const AppInfosCIter appIdInfoCIter = appIdInfos.find(*citer);
const AppInfosCIter appIdInfoCIter = appIdInfos.find(citer->first);
if (appIdInfoCIter == appIdInfos.cend()) {
return rc_APPID_NOT_FOUND; // RETURN
}
Expand Down
13 changes: 6 additions & 7 deletions src/groups/mqb/mqbc/mqbc_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3639,7 +3639,7 @@ void StorageManager::initializeQueueKeyInfoMap(
for (AppInfosCIter appIdCit = csQinfo.appInfos().cbegin();
appIdCit != csQinfo.appInfos().cend();
++appIdCit) {
qinfo.addAppInfo(*appIdCit);
qinfo.addAppInfo(appIdCit);
}

d_queueKeyInfoMapVec.at(csQinfo.partitionId())
Expand All @@ -3650,12 +3650,11 @@ void StorageManager::initializeQueueKeyInfoMap(
d_isQueueKeyInfoMapVecInitialized = true;
}

void StorageManager::registerQueue(
const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const bsl::unordered_set<AppInfo>& appIdKeyPairs,
mqbi::Domain* domain)
void StorageManager::registerQueue(const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const AppInfos& appIdKeyPairs,
mqbi::Domain* domain)
{
// executed by the *CLUSTER DISPATCHER* thread

Expand Down
10 changes: 5 additions & 5 deletions src/groups/mqb/mqbc/mqbc_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -808,11 +808,11 @@ class StorageManager BSLS_KEYWORD_FINAL
/// associated queue storage created.
///
/// THREAD: Executed by the Client's dispatcher thread.
void registerQueue(const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const bsl::unordered_set<AppInfo>& appIdKeyPairs,
mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE;
void registerQueue(const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const AppInfos& appIdKeyPairs,
mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE;

/// Synchronously unregister the queue with the specified `uri` from the
/// specified `partitionId`.
Expand Down
149 changes: 120 additions & 29 deletions src/groups/mqb/mqbc/mqbc_storageutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,92 @@ void optionalSemaphorePost(bslmt::Semaphore* semaphore)
// ------------------

// PRIVATE FUNCTIONS
bool StorageUtil::loadUpdatedAppInfos(AppInfos* addedAppInfos,
AppInfos* removedAppInfos,
const mqbs::ReplicatedStorage& storage,

bool StorageUtil::loadDifference(mqbi::Storage::AppInfos* result,
const mqbi::Storage::AppInfos& baseSet,
const mqbi::Storage::AppInfos& subtractionSet,
bool findConflicts)
{
bool noConflicts = true;
for (mqbi::Storage::AppInfos::const_iterator cit = baseSet.cbegin();
cit != baseSet.cend();
++cit) {
mqbi::Storage::AppInfos::const_iterator match = subtractionSet.find(
cit->first);

if (subtractionSet.end() == match) {
result->emplace(cit->first, cit->second);
}
else if (findConflicts && match->second != cit->second) {
BALL_LOG_ERROR << "appId [" << cit->first
<< "] has conflicting appKeys [" << cit->second
<< " vs " << match->second << "]. Ignoring ["
<< cit->second << "]";
noConflicts = false;
}
}

return noConflicts;
}

void StorageUtil::loadDifference(
bsl::unordered_set<bsl::string>* result,
const bsl::unordered_set<bsl::string>& baseSet,
const bsl::unordered_set<bsl::string>& subtractionSet)
{
for (bsl::unordered_set<bsl::string>::const_iterator cit =
baseSet.cbegin();
cit != baseSet.cend();
++cit) {
if (subtractionSet.end() == subtractionSet.find(*cit)) {
result->emplace(*cit);
}
}
}

bool StorageUtil::loadAddedAndRemovedEntries(
mqbi::Storage::AppInfos* addedEntries,
mqbi::Storage::AppInfos* removedEntries,
const mqbi::Storage::AppInfos& existingEntries,
const mqbi::Storage::AppInfos& newEntries)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(addedEntries);
BSLS_ASSERT_SAFE(removedEntries);

// Find newly added entries.
bool noConflicts =
loadDifference(addedEntries, newEntries, existingEntries, true);

// Find removed entries.
loadDifference(removedEntries, existingEntries, newEntries, false);

return noConflicts;
}

void StorageUtil::loadAddedAndRemovedEntries(
bsl::unordered_set<bsl::string>* addedEntries,
bsl::unordered_set<bsl::string>* removedEntries,
const bsl::unordered_set<bsl::string>& existingEntries,
const bsl::unordered_set<bsl::string>& newEntries)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(addedEntries);
BSLS_ASSERT_SAFE(removedEntries);

// Find newly added entries.
loadDifference(addedEntries, newEntries, existingEntries);

// Find removed entries.
loadDifference(removedEntries, existingEntries, newEntries);
}

bool StorageUtil::loadUpdatedAppInfos(AppInfos* addedAppInfos,
AppInfos* removedAppInfos,
const AppInfos& existingAppInfos,
const AppInfos& newAppInfos)
{
// executed by the *CLUSTER DISPATCHER* thread
// executed by the *QUEUE_DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(addedAppInfos);
Expand All @@ -101,14 +181,14 @@ bool StorageUtil::loadUpdatedAppInfos(AppInfos* addedAppInfos,
// list of newly added and removed appIds, and then invoking 'updateQueue'
// in the appropriate thread.

AppInfos existingAppInfos;
storage.loadVirtualStorageDetails(&existingAppInfos);

loadAddedAndRemovedEntries(addedAppInfos,
removedAppInfos,
existingAppInfos,
newAppInfos);

// TEMPORARY: if duplicate AppKey values exist for the same AppId, ignore
// the one in 'newAppInfos'.

if (addedAppInfos->empty() && removedAppInfos->empty()) {
// No appIds to add or remove.
return false; // RETURN
Expand Down Expand Up @@ -184,8 +264,7 @@ void StorageUtil::updateQueuePrimaryDispatched(
mqbs::FileStore* fs,
const bsl::string& clusterDescription,
int partitionId,
const AppInfos& addedIdKeyPairs,
const AppInfos& removedIdKeyPairs,
const AppInfos& appIdKeyPairs,
bool isFanout)
{
// executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId'
Expand All @@ -198,13 +277,33 @@ void StorageUtil::updateQueuePrimaryDispatched(

bslmt::LockGuard<bslmt::Mutex> guard(storagesLock); // LOCK

AppInfos existingAppInfos;
storage->loadVirtualStorageDetails(&existingAppInfos);

bmqu::Printer<AppInfos> printer2(&existingAppInfos);

BALL_LOG_INFO << clusterDescription << " Partition [" << partitionId
<< "]: Existing queue '" << storage->queueUri()
<< "', queueKey: '" << storage->queueKey() << "' "
<< printer2 << " in the storage.";

AppInfos addedAppInfos, removedAppInfos;

bool hasUpdate = loadUpdatedAppInfos(&addedAppInfos,
&removedAppInfos,
existingAppInfos,
appIdKeyPairs);
if (!hasUpdate) {
// No update needed for AppId/Key pairs.
return; // RETURN
}
// Simply forward to 'updateQueuePrimaryRaw'.
updateQueuePrimaryRaw(storage,
fs,
clusterDescription,
partitionId,
addedIdKeyPairs,
removedIdKeyPairs,
addedAppInfos,
removedAppInfos,
isFanout);
}

Expand Down Expand Up @@ -1401,9 +1500,7 @@ void StorageUtil::recoveredQueuesCb(
for (AppInfos::const_iterator cit = qinfo.appIdKeyPairs().cbegin();
cit != qinfo.appIdKeyPairs().cend();
++cit) {
const AppInfo& p = *cit;

AppIdsInsertRc appIdsIrc = appIds.insert(p.first);
AppIdsInsertRc appIdsIrc = appIds.insert(cit->first);
if (false == appIdsIrc.second) {
// Duplicate AppId.

Expand All @@ -1413,7 +1510,7 @@ void StorageUtil::recoveredQueuesCb(
<< "encountered a duplicate AppId while processing "
<< "recovered queue [" << uri << "], " << "queueKey ["
<< qit->first << "]. AppId [" << *(appIdsIrc.first)
<< "]. AppKey [" << p.second << "]."
<< "]. AppKey [" << cit->second << "]."
<< BMQTSK_ALARMLOG_END;
mqbu::ExitUtil::terminate(
mqbu::ExitCode::e_RECOVERY_FAILURE);
Expand Down Expand Up @@ -2225,6 +2322,12 @@ void StorageUtil::registerQueue(
const mqbconfm::StorageDefinition& storageDef = domain->config().storage();
const mqbconfm::QueueMode& queueMode = domain->config().mode();

bmqu::Printer<AppInfos> printer1(&appIdKeyPairs);

BALL_LOG_INFO << clusterDescription << " Partition [" << partitionId
<< "]: Registering queue '" << uri << "', queueKey: '"
<< queueKey << "' " << printer1 << " to the storage.";

if (queueMode.isUndefinedValue()) {
BMQTSK_ALARMLOG_ALARM("STORAGE")
<< "Partition [" << partitionId
Expand Down Expand Up @@ -2280,18 +2383,7 @@ void StorageUtil::registerQueue(
// to be added or removed (see comments at the beginning of this
// routine for explanation).

AppInfos addedAppInfos, removedAppInfos;

bool hasUpdate = loadUpdatedAppInfos(&addedAppInfos,
&removedAppInfos,
*storageSp.get(),
appIdKeyPairs);
if (!hasUpdate) {
// No update needed for AppId/Key pairs.
return; // RETURN
}

// Some AppId/Key pairs need to be updated. Invoke
// Invoke
// 'updateQueuePrimaryDispatched' in the right thread to carry out
// the addition/removal of those pairs.

Expand All @@ -2308,8 +2400,7 @@ void StorageUtil::registerQueue(
fs,
clusterDescription,
partitionId,
addedAppInfos,
removedAppInfos,
appIdKeyPairs,
domain->config().mode().isFanoutValue()));

dispatcher->dispatchEvent(queueEvent,
Expand Down
Loading

0 comments on commit fae5ec1

Please sign in to comment.