diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index 2362c4f415..059227215c 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -2210,6 +2210,9 @@ void Cluster::onRecoveryStatusDispatched( BSLS_ASSERT_SAFE(itMp->storage()->partitionId() == static_cast(pid)); + // TODO: wrong thread to call 'loadVirtualStorageDetails' + // but 'onRecoveryStatusDispatched' should not be concurrent + // with any of 'add/removeVirtualStorage' calls. AppInfos appIdInfos; itMp->storage()->loadVirtualStorageDetails(&appIdInfos); diff --git a/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp b/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp index b371298378..df4e3e1c00 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp @@ -239,13 +239,12 @@ void QueueState::loadInternals(mqbcmd::QueueState* out) const appIdKeyPairs.cbegin(); cit != appIdKeyPairs.cend(); ++cit, ++i) { - const mqbi::Storage::AppInfo& p = *cit; - virtualStorages[i].appId() = p.first; + virtualStorages[i].appId() = cit->first; os.reset(); - os << p.second; + os << cit->second; virtualStorages[i].appKey() = os.str(); virtualStorages[i].numMessages() = d_storage_mp->numMessages( - p.second); + cit->second); } } } diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp index 48acd1abaa..4a7b1bf6f4 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp @@ -1604,16 +1604,14 @@ void RelayQueueEngine::loadInternals(mqbcmd::QueueEngine* out) const appIdKeyPairs.cbegin(); cit != appIdKeyPairs.cend(); ++cit) { - const mqbi::Storage::AppInfo& p = *cit; - subStreams.resize(subStreams.size() + 1); mqbcmd::RelayQueueEngineSubStream& subStream = subStreams.back(); - subStream.appId() = p.first; + subStream.appId() = cit->first; bmqu::MemOutStream appKey; - appKey << p.second; + appKey << cit->second; subStream.appKey() = appKey.str(); subStream.numMessages() = d_queueState_p->storage()->numMessages( - p.second); + cit->second); } } diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp index b0615f9100..3d47083e03 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp @@ -499,7 +499,7 @@ int ClusterState::updateQueue(const bmqt::Uri& uri, for (AppInfosCIter citer = removedAppIds.begin(); citer != removedAppIds.end(); ++citer) { - const AppInfosCIter appIdInfoCIter = appIdInfos.find(*citer); + const AppInfosCIter appIdInfoCIter = appIdInfos.find(citer->first); if (appIdInfoCIter == appIdInfos.cend()) { return rc_APPID_NOT_FOUND; // RETURN } diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index c97387cfc1..0662ee8908 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -3639,7 +3639,7 @@ void StorageManager::initializeQueueKeyInfoMap( for (AppInfosCIter appIdCit = csQinfo.appInfos().cbegin(); appIdCit != csQinfo.appInfos().cend(); ++appIdCit) { - qinfo.addAppInfo(*appIdCit); + qinfo.addAppInfo(appIdCit); } d_queueKeyInfoMapVec.at(csQinfo.partitionId()) @@ -3650,12 +3650,11 @@ void StorageManager::initializeQueueKeyInfoMap( d_isQueueKeyInfoMapVecInitialized = true; } -void StorageManager::registerQueue( - const bmqt::Uri& uri, - const mqbu::StorageKey& queueKey, - int partitionId, - const bsl::unordered_set& appIdKeyPairs, - mqbi::Domain* domain) +void StorageManager::registerQueue(const bmqt::Uri& uri, + const mqbu::StorageKey& queueKey, + int partitionId, + const AppInfos& appIdKeyPairs, + mqbi::Domain* domain) { // executed by the *CLUSTER DISPATCHER* thread diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.h b/src/groups/mqb/mqbc/mqbc_storagemanager.h index ca991c6205..e210f11352 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.h +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.h @@ -808,11 +808,11 @@ class StorageManager BSLS_KEYWORD_FINAL /// associated queue storage created. /// /// THREAD: Executed by the Client's dispatcher thread. - void registerQueue(const bmqt::Uri& uri, - const mqbu::StorageKey& queueKey, - int partitionId, - const bsl::unordered_set& appIdKeyPairs, - mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE; + void registerQueue(const bmqt::Uri& uri, + const mqbu::StorageKey& queueKey, + int partitionId, + const AppInfos& appIdKeyPairs, + mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE; /// Synchronously unregister the queue with the specified `uri` from the /// specified `partitionId`. diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.cpp b/src/groups/mqb/mqbc/mqbc_storageutil.cpp index 649ec946a9..7cca27ca09 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_storageutil.cpp @@ -75,12 +75,92 @@ void optionalSemaphorePost(bslmt::Semaphore* semaphore) // ------------------ // PRIVATE FUNCTIONS -bool StorageUtil::loadUpdatedAppInfos(AppInfos* addedAppInfos, - AppInfos* removedAppInfos, - const mqbs::ReplicatedStorage& storage, + +bool StorageUtil::loadDifference(mqbi::Storage::AppInfos* result, + const mqbi::Storage::AppInfos& baseSet, + const mqbi::Storage::AppInfos& subtractionSet, + bool findConflicts) +{ + bool noConflicts = true; + for (mqbi::Storage::AppInfos::const_iterator cit = baseSet.cbegin(); + cit != baseSet.cend(); + ++cit) { + mqbi::Storage::AppInfos::const_iterator match = subtractionSet.find( + cit->first); + + if (subtractionSet.end() == match) { + result->emplace(cit->first, cit->second); + } + else if (findConflicts && match->second != cit->second) { + BALL_LOG_ERROR << "appId [" << cit->first + << "] has conflicting appKeys [" << cit->second + << " vs " << match->second << "]. Ignoring [" + << cit->second << "]"; + noConflicts = false; + } + } + + return noConflicts; +} + +void StorageUtil::loadDifference( + bsl::unordered_set* result, + const bsl::unordered_set& baseSet, + const bsl::unordered_set& subtractionSet) +{ + for (bsl::unordered_set::const_iterator cit = + baseSet.cbegin(); + cit != baseSet.cend(); + ++cit) { + if (subtractionSet.end() == subtractionSet.find(*cit)) { + result->emplace(*cit); + } + } +} + +bool StorageUtil::loadAddedAndRemovedEntries( + mqbi::Storage::AppInfos* addedEntries, + mqbi::Storage::AppInfos* removedEntries, + const mqbi::Storage::AppInfos& existingEntries, + const mqbi::Storage::AppInfos& newEntries) +{ + // PRECONDITIONS + BSLS_ASSERT_SAFE(addedEntries); + BSLS_ASSERT_SAFE(removedEntries); + + // Find newly added entries. + bool noConflicts = + loadDifference(addedEntries, newEntries, existingEntries, true); + + // Find removed entries. + loadDifference(removedEntries, existingEntries, newEntries, false); + + return noConflicts; +} + +void StorageUtil::loadAddedAndRemovedEntries( + bsl::unordered_set* addedEntries, + bsl::unordered_set* removedEntries, + const bsl::unordered_set& existingEntries, + const bsl::unordered_set& newEntries) +{ + // PRECONDITIONS + BSLS_ASSERT_SAFE(addedEntries); + BSLS_ASSERT_SAFE(removedEntries); + + // Find newly added entries. + loadDifference(addedEntries, newEntries, existingEntries); + + // Find removed entries. + loadDifference(removedEntries, existingEntries, newEntries); +} + +bool StorageUtil::loadUpdatedAppInfos(AppInfos* addedAppInfos, + AppInfos* removedAppInfos, + const AppInfos& existingAppInfos, const AppInfos& newAppInfos) { - // executed by the *CLUSTER DISPATCHER* thread + // executed by the *QUEUE_DISPATCHER* thread // PRECONDITIONS BSLS_ASSERT_SAFE(addedAppInfos); @@ -101,14 +181,14 @@ bool StorageUtil::loadUpdatedAppInfos(AppInfos* addedAppInfos, // list of newly added and removed appIds, and then invoking 'updateQueue' // in the appropriate thread. - AppInfos existingAppInfos; - storage.loadVirtualStorageDetails(&existingAppInfos); - loadAddedAndRemovedEntries(addedAppInfos, removedAppInfos, existingAppInfos, newAppInfos); + // TEMPORARY: if duplicate AppKey values exist for the same AppId, ignore + // the one in 'newAppInfos'. + if (addedAppInfos->empty() && removedAppInfos->empty()) { // No appIds to add or remove. return false; // RETURN @@ -184,8 +264,7 @@ void StorageUtil::updateQueuePrimaryDispatched( mqbs::FileStore* fs, const bsl::string& clusterDescription, int partitionId, - const AppInfos& addedIdKeyPairs, - const AppInfos& removedIdKeyPairs, + const AppInfos& appIdKeyPairs, bool isFanout) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -198,13 +277,33 @@ void StorageUtil::updateQueuePrimaryDispatched( bslmt::LockGuard guard(storagesLock); // LOCK + AppInfos existingAppInfos; + storage->loadVirtualStorageDetails(&existingAppInfos); + + bmqu::Printer printer2(&existingAppInfos); + + BALL_LOG_INFO << clusterDescription << " Partition [" << partitionId + << "]: Existing queue '" << storage->queueUri() + << "', queueKey: '" << storage->queueKey() << "' " + << printer2 << " in the storage."; + + AppInfos addedAppInfos, removedAppInfos; + + bool hasUpdate = loadUpdatedAppInfos(&addedAppInfos, + &removedAppInfos, + existingAppInfos, + appIdKeyPairs); + if (!hasUpdate) { + // No update needed for AppId/Key pairs. + return; // RETURN + } // Simply forward to 'updateQueuePrimaryRaw'. updateQueuePrimaryRaw(storage, fs, clusterDescription, partitionId, - addedIdKeyPairs, - removedIdKeyPairs, + addedAppInfos, + removedAppInfos, isFanout); } @@ -1401,9 +1500,7 @@ void StorageUtil::recoveredQueuesCb( for (AppInfos::const_iterator cit = qinfo.appIdKeyPairs().cbegin(); cit != qinfo.appIdKeyPairs().cend(); ++cit) { - const AppInfo& p = *cit; - - AppIdsInsertRc appIdsIrc = appIds.insert(p.first); + AppIdsInsertRc appIdsIrc = appIds.insert(cit->first); if (false == appIdsIrc.second) { // Duplicate AppId. @@ -1413,7 +1510,7 @@ void StorageUtil::recoveredQueuesCb( << "encountered a duplicate AppId while processing " << "recovered queue [" << uri << "], " << "queueKey [" << qit->first << "]. AppId [" << *(appIdsIrc.first) - << "]. AppKey [" << p.second << "]." + << "]. AppKey [" << cit->second << "]." << BMQTSK_ALARMLOG_END; mqbu::ExitUtil::terminate( mqbu::ExitCode::e_RECOVERY_FAILURE); @@ -2225,6 +2322,12 @@ void StorageUtil::registerQueue( const mqbconfm::StorageDefinition& storageDef = domain->config().storage(); const mqbconfm::QueueMode& queueMode = domain->config().mode(); + bmqu::Printer printer1(&appIdKeyPairs); + + BALL_LOG_INFO << clusterDescription << " Partition [" << partitionId + << "]: Registering queue '" << uri << "', queueKey: '" + << queueKey << "' " << printer1 << " to the storage."; + if (queueMode.isUndefinedValue()) { BMQTSK_ALARMLOG_ALARM("STORAGE") << "Partition [" << partitionId @@ -2280,18 +2383,7 @@ void StorageUtil::registerQueue( // to be added or removed (see comments at the beginning of this // routine for explanation). - AppInfos addedAppInfos, removedAppInfos; - - bool hasUpdate = loadUpdatedAppInfos(&addedAppInfos, - &removedAppInfos, - *storageSp.get(), - appIdKeyPairs); - if (!hasUpdate) { - // No update needed for AppId/Key pairs. - return; // RETURN - } - - // Some AppId/Key pairs need to be updated. Invoke + // Invoke // 'updateQueuePrimaryDispatched' in the right thread to carry out // the addition/removal of those pairs. @@ -2308,8 +2400,7 @@ void StorageUtil::registerQueue( fs, clusterDescription, partitionId, - addedAppInfos, - removedAppInfos, + appIdKeyPairs, domain->config().mode().isFanoutValue())); dispatcher->dispatchEvent(queueEvent, diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.h b/src/groups/mqb/mqbc/mqbc_storageutil.h index dd4f801e01..993bdf2d2a 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.h +++ b/src/groups/mqb/mqbc/mqbc_storageutil.h @@ -101,7 +101,6 @@ struct StorageUtil { private: // TYPES - typedef mqbi::StorageManager::AppInfo AppInfo; typedef mqbi::StorageManager::AppInfos AppInfos; typedef mqbi::StorageManager::AppInfosCIter AppInfosCIter; @@ -171,23 +170,33 @@ struct StorageUtil { private: // PRIVATE FUNCTIONS + /// Load into the specified `result` the list of elements present in + /// `baseSet` which are not present in `subtractionSet`. If the specified + /// `findConflicts` is `true`, detect appKey mismatch between the same + /// appId in the `baseSet` and `subtractionSet` and return `false` if + /// appKey values do not match. Otherwise, return `true`. + static bool loadDifference(mqbi::Storage::AppInfos* result, + const mqbi::Storage::AppInfos& baseSet, + const mqbi::Storage::AppInfos& subtractionSet, + bool findConflicts); + /// Load into the specified `result` the list of elements present in /// `baseSet` which are not present in `subtractionSet`. - template - static void loadDifference(bsl::unordered_set* result, - const bsl::unordered_set& baseSet, - const bsl::unordered_set& subtractionSet); + static void + loadDifference(bsl::unordered_set* result, + const bsl::unordered_set& baseSet, + const bsl::unordered_set& subtractionSet); /// Load into the specified `addedAppInfos` and /// `removedAppInfos` the appId/key pairs which have been added and - /// removed respectively for the specified `storage` based on the + /// removed respectively for the specified `existingAppInfos` based on the /// specified `newAppInfos`. Return true if there are any added or removed /// appId/key pairs, false otherwise. /// - /// THREAD: Executed by the cluster dispatcher thread. - static bool loadUpdatedAppInfos(AppInfos* addedAppInfos, - AppInfos* removedAppInfos, - const mqbs::ReplicatedStorage& storage, + /// THREAD: Executed by the queue dispatcher thread. + static bool loadUpdatedAppInfos(AppInfos* addedAppInfos, + AppInfos* removedAppInfos, + const AppInfos& existingAppInfos, const AppInfos& newAppInfos); /// THREAD: Executed by the Queue's dispatcher thread. @@ -207,8 +216,7 @@ struct StorageUtil { mqbs::FileStore* fs, const bsl::string& clusterDescription, int partitionId, - const AppInfos& addedIdKeyPairs, - const AppInfos& removedIdKeyPairs, + const AppInfos& appIdKeyPairs, bool isFanout); /// StorageManager's storages lock must be locked before calling this @@ -359,16 +367,27 @@ struct StorageUtil { public: // FUNCTIONS + /// Load into the specified `addedEntries` the list of entries which are + /// present in `newEntries` but not in `existingEntries`. Similarly, load + /// into the specified `removedEntries` the list of entries which are + /// present in `existingEntries` but not in `newEntries`. Return `false` + /// if appKey values do not match for the same appId in the `newEntries` + /// and the `existingEntries`. Otherwise, return `true`. + static bool + loadAddedAndRemovedEntries(mqbi::Storage::AppInfos* addedEntries, + mqbi::Storage::AppInfos* removedEntries, + const mqbi::Storage::AppInfos& existingEntries, + const mqbi::Storage::AppInfos& newEntries); + /// Load into the specified `addedEntries` the list of entries which are /// present in `newEntries` but not in `existingEntries`. Similarly, load /// into the specified `removedEntries` the list of entries which are /// present in `existingEntries` but not in `newEntries`. - template - static void - loadAddedAndRemovedEntries(bsl::unordered_set* addedEntries, - bsl::unordered_set* removedEntries, - const bsl::unordered_set& existingEntries, - const bsl::unordered_set& newEntries); + static void loadAddedAndRemovedEntries( + bsl::unordered_set* addedEntries, + bsl::unordered_set* removedEntries, + const bsl::unordered_set& existingEntries, + const bsl::unordered_set& newEntries); /// Return true if the queue having specified `uri` and assigned to the /// specified `partitionId` has no messages in the specified @@ -786,41 +805,6 @@ unsigned int StorageUtil::extractPartitionId(const bmqp::Event& event); // struct StorageUtil // ------------------ -template -void StorageUtil::loadDifference(bsl::unordered_set* result, - const bsl::unordered_set& baseSet, - const bsl::unordered_set& subtractionSet) -{ - // PRECONDITIONS - BSLS_ASSERT_SAFE(result); - - typedef typename bsl::unordered_set::const_iterator CIter; - - for (CIter it = baseSet.cbegin(); it != baseSet.cend(); ++it) { - if (subtractionSet.end() == subtractionSet.find(*it)) { - result->emplace(*it); - } - } -} - -template -void StorageUtil::loadAddedAndRemovedEntries( - bsl::unordered_set* addedEntries, - bsl::unordered_set* removedEntries, - const bsl::unordered_set& existingEntries, - const bsl::unordered_set& newEntries) -{ - // PRECONDITIONS - BSLS_ASSERT_SAFE(addedEntries); - BSLS_ASSERT_SAFE(removedEntries); - - // Find newly added entries. - loadDifference(addedEntries, newEntries, existingEntries); - - // Find removed entries. - loadDifference(removedEntries, existingEntries, newEntries); -} - template unsigned int StorageUtil::extractPartitionId(const bmqp::Event& event) { diff --git a/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h b/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h index c3e3c13dd5..c226f937fc 100644 --- a/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h +++ b/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h @@ -87,7 +87,7 @@ class ClusterStateManager { /// Pair of (appId, appKey) typedef bsl::pair AppInfo; - typedef bsl::unordered_set AppInfos; + typedef bsl::unordered_map AppInfos; typedef AppInfos::const_iterator AppInfosCIter; struct QueueAssignmentResult { diff --git a/src/groups/mqb/mqbi/mqbi_storage.h b/src/groups/mqb/mqbi/mqbi_storage.h index f20d161d3a..95267c8af7 100644 --- a/src/groups/mqb/mqbi/mqbi_storage.h +++ b/src/groups/mqb/mqbi/mqbi_storage.h @@ -375,12 +375,8 @@ class Storage { public: // PUBLIC TYPES - /// `AppInfo` is an alias for an (appId, appKey) pairing - /// representing unique virtual storage identification. - typedef bsl::pair AppInfo; - - /// `AppInfos` is an alias for a set of pairs of appId and appKey - typedef bsl::unordered_set AppInfos; + /// `AppInfos` is an alias for a map [appId] -> appKey + typedef bsl::unordered_map AppInfos; typedef bmqc::Array diff --git a/src/groups/mqb/mqbi/mqbi_storagemanager.h b/src/groups/mqb/mqbi/mqbi_storagemanager.h index 58591e9d07..1c492202a1 100644 --- a/src/groups/mqb/mqbi/mqbi_storagemanager.h +++ b/src/groups/mqb/mqbi/mqbi_storagemanager.h @@ -167,7 +167,6 @@ class StorageManagerIterator { class StorageManager { public: // TYPES - typedef mqbi::Storage::AppInfo AppInfo; typedef mqbi::Storage::AppInfos AppInfos; typedef AppInfos::const_iterator AppInfosCIter; diff --git a/src/groups/mqb/mqbs/mqbs_datastore.h b/src/groups/mqb/mqbs/mqbs_datastore.h index 3de503c969..596a13f6f9 100644 --- a/src/groups/mqb/mqbs/mqbs_datastore.h +++ b/src/groups/mqb/mqbs/mqbs_datastore.h @@ -271,8 +271,6 @@ struct DataStoreRecordKeyLess { class DataStoreConfigQueueInfo { public: // TYPES - typedef mqbi::Storage::AppInfo AppInfo; - typedef mqbi::Storage::AppInfos AppInfos; private: @@ -299,7 +297,7 @@ class DataStoreConfigQueueInfo { void setPartitionId(int value); - void addAppInfo(const AppInfo& value); + void addAppInfo(const AppInfos::const_iterator& value); // ACCESSORS const bsl::string& canonicalQueueUri() const; @@ -335,8 +333,6 @@ class DataStoreConfig { typedef Records::const_iterator RecordConstIterator; - typedef mqbi::Storage::AppInfo AppInfo; - typedef mqbi::Storage::AppInfos AppInfos; typedef bsl::functionfirst, value->second); } // ACCESSORS diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.h b/src/groups/mqb/mqbs/mqbs_filebackedstorage.h index 827d8b7e3c..58f916468a 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.h +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.h @@ -131,8 +131,6 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { public: // TYPES - typedef mqbi::Storage::AppInfo AppInfo; - typedef mqbi::Storage::AppInfos AppInfos; typedef ReplicatedStorage::RecordHandles RecordHandles; diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index bdf5c9106e..6a4298c92a 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -2067,7 +2067,7 @@ int FileStore::recoverMessages(QueueKeyInfoMap* queueKeyInfoMap, FileStoreProtocol::k_HASH_LENGTH, appIdsAreaLen); - AppInfos appIdKeyPairs; + AppInfos appIdKeyPairs(d_allocator_p); FileStoreProtocolUtil::loadAppInfos(&appIdKeyPairs, appIdsBlock, numAppIds); @@ -2075,8 +2075,7 @@ int FileStore::recoverMessages(QueueKeyInfoMap* queueKeyInfoMap, for (AppInfos::const_iterator cit = appIdKeyPairs.cbegin(); cit != appIdKeyPairs.cend(); ++cit) { - const AppInfo& p = *cit; - if (0 == deletedAppKeysOffsets.count(p.second)) { + if (0 == deletedAppKeysOffsets.count(cit->second)) { // This appKey is not deleted. Add it to the list // of 'alive' appId/appKey pairs for this queue. // Note that we don't check for appId/appKey @@ -2084,15 +2083,16 @@ int FileStore::recoverMessages(QueueKeyInfoMap* queueKeyInfoMap, // StorageMgr because we have recovered all // appId/appKey pairs by that time. - qinfo.addAppInfo(p); + qinfo.addAppInfo(cit); - BALL_LOG_INFO - << partitionDesc() - << "Recovered appId/appKey pair ['" << p.first - << "' (" << p.second << ")] in QueueOp [" - << queueOpType << "] record for queue [" - << qinfo.canonicalQueueUri() - << "] with queue key [" << queueKey << "]."; + BALL_LOG_INFO << partitionDesc() + << "Recovered appId/appKey pair ['" + << cit->first << "' (" << cit->second + << ")] in QueueOp [" << queueOpType + << "] record for queue [" + << qinfo.canonicalQueueUri() + << "] with queue key [" << queueKey + << "]."; } } } @@ -5720,13 +5720,12 @@ int FileStore::writeQueueCreationRecord(DataStoreRecordHandle* handle, for (AppInfos::const_iterator cit = appIdKeyPairs.cbegin(); cit != appIdKeyPairs.cend(); ++cit, ++i) { - const AppInfo& appIdKeyPair = *cit; - BSLS_ASSERT_SAFE(!appIdKeyPair.first.empty()); - BSLS_ASSERT_SAFE(!appIdKeyPair.second.isNull()); + BSLS_ASSERT_SAFE(!cit->first.empty()); + BSLS_ASSERT_SAFE(!cit->second.isNull()); appIdWords[i] = bmqp::ProtocolUtil::calcNumWordsAndPadding( &appIdPaddings[i], - appIdKeyPair.first.length()); - totalLength += sizeof(AppIdHeader) + appIdKeyPair.first.length() + + cit->first.length()); + totalLength += sizeof(AppIdHeader) + cit->first.length() + appIdPaddings[i] + FileStoreProtocol::k_HASH_LENGTH; // for AppKey } diff --git a/src/groups/mqb/mqbs/mqbs_filestore.h b/src/groups/mqb/mqbs/mqbs_filestore.h index 532f55ddf4..8d40e8bab2 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.h +++ b/src/groups/mqb/mqbs/mqbs_filestore.h @@ -209,7 +209,6 @@ class FileStore BSLS_KEYWORD_FINAL : public DataStore { typedef DataStoreConfig::QueueKeyInfoMapConstIter QueueKeyInfoMapConstIter; typedef DataStoreConfig::QueueKeyInfoMapInsertRc QueueKeyInfoMapInsertRc; - typedef mqbi::Storage::AppInfo AppInfo; typedef mqbi::Storage::AppInfos AppInfos; typedef StorageCollectionUtil::StoragesMap StoragesMap; diff --git a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.cpp b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.cpp index c6824b675e..04e871168b 100644 --- a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.cpp @@ -331,10 +331,9 @@ int FileStoreProtocolUtil::calculateMd5Digest( } void FileStoreProtocolUtil::loadAppInfos( - bsl::unordered_set >* - appIdKeyPairs, - const MemoryBlock& appIdsBlock, - unsigned int numAppIds) + bsl::unordered_map* appIdKeyPairs, + const MemoryBlock& appIdsBlock, + unsigned int numAppIds) { // PRECONDITIONS BSLS_ASSERT_SAFE(appIdKeyPairs); diff --git a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.h b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.h index d6789c8418..11eec15a05 100644 --- a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.h +++ b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.h @@ -108,11 +108,10 @@ struct FileStoreProtocolUtil { const bmqu::BlobPosition& startPos, unsigned int length); - static void - loadAppInfos(bsl::unordered_set >* - appIdKeyPairs, - const MemoryBlock& appIdsBlock, - unsigned int numAppIds); + static void loadAppInfos( + bsl::unordered_map* appIdKeyPairs, + const MemoryBlock& appIdsBlock, + unsigned int numAppIds); }; } // close package namespace diff --git a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.t.cpp b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.t.cpp index ca37304b8c..8f374f4b5e 100644 --- a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.t.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.t.cpp @@ -517,8 +517,7 @@ static void test4_loadAppInfos() // loadAppInfos() // ------------------------------------------------------------------------ { - typedef bsl::pair AppInfo; - typedef bsl::unordered_set AppInfos; + typedef bsl::unordered_map AppInfos; { // No appIds. @@ -666,9 +665,9 @@ static void test4_loadAppInfos() appHash, mqbs::FileStoreProtocol::k_HASH_LENGTH); - expectedAppInfos.emplace(AppInfo( + expectedAppInfos.emplace( bsl::string(appId, bmqtst::TestHelperUtil::allocator()), - appKey)); + appKey); offset += mqbs::FileStoreProtocol::k_HASH_LENGTH; } // Test. diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.h b/src/groups/mqb/mqbs/mqbs_inmemorystorage.h index 8d0cec0760..9677a1486f 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.h +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.h @@ -167,8 +167,6 @@ class InMemoryStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { public: // TYPES - typedef mqbi::Storage::AppInfo AppInfo; - typedef mqbi::Storage::AppInfos AppInfos; typedef ReplicatedStorage::RecordHandles RecordHandles; diff --git a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h index ede15bf1f6..1b37dfc6ae 100644 --- a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h +++ b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h @@ -75,8 +75,6 @@ class VirtualStorageCatalog { public: // TYPES - typedef mqbi::Storage::AppInfo AppInfo; - typedef mqbi::Storage::AppInfos AppInfos; typedef unsigned int Ordinal;