Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf[MQB]: fix linear iteration over all queues on any replication #525

Merged
merged 13 commits into from
Dec 3, 2024
31 changes: 8 additions & 23 deletions src/groups/mqb/mqbblp/mqbblp_localqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ LocalQueue::LocalQueue(QueueState* state, bslma::Allocator* allocator)
, d_state_p(state)
, d_queueEngine_mp(0)
, d_throttledFailedPutMessages(5000, 1) // 1 log per 5s interval
, d_hasNewMessages(false)
, d_throttledDuplicateMessages()
, d_haveStrongConsistency(false)
{
Expand Down Expand Up @@ -140,6 +139,7 @@ int LocalQueue::configure(bsl::ostream& errorDescription, bool isReconfigure)
d_state_p->setStorage(storageMp);
}
else {
d_state_p->storage()->setConsistency(domainCfg.consistency());
rc = d_state_p->storage()->configure(errorDescription,
domainCfg.storage().config(),
domainCfg.storage().queueLimits(),
Expand Down Expand Up @@ -284,7 +284,7 @@ void LocalQueue::configureHandle(
// to it. We need to make sure that storage/replication is in sync, and
// thus, we force-flush the file store.

d_state_p->storage()->dispatcherFlush(true, false);
d_state_p->storage()->flushStorage();

// Attempt to deliver all data in the storage. Otherwise, broadcast
// can get dropped if the incoming configure request removes consumers.
Expand All @@ -306,7 +306,7 @@ void LocalQueue::releaseHandle(
BSLS_ASSERT_SAFE(d_state_p->queue()->dispatcher()->inDispatcherThread(
d_state_p->queue()));

d_state_p->storage()->dispatcherFlush(true, false);
d_state_p->storage()->flushStorage();

d_queueEngine_mp->releaseHandle(handle,
handleParameters,
Expand Down Expand Up @@ -375,8 +375,8 @@ void LocalQueue::flush()
// until it gets rolled back. If 'flush' gets called in between, the queue
// may have no storage.
if (d_state_p->storage()) {
d_state_p->storage()->dispatcherFlush(true, false);
// See notes in 'FileStore::dispatcherFlush' for motivation behind
d_state_p->storage()->flushStorage();
// See notes in 'FileStore::flushStorage' for motivation behind
// this flush.
}

Expand Down Expand Up @@ -502,8 +502,7 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,
if (BSLS_PERFORMANCEHINT_PREDICT_LIKELY(res ==
mqbi::StorageResult::e_SUCCESS)) {
// Message has been saved in the storage, but we don't indicate the
// engine yet of the new message, instead we just update the
// 'd_hasNewMessages' flag. This is because storage (replicated)
// engine yet of the new message. This is because storage (replicated)
// messages are nagled, and we don't want to indicate to a peer to
// deliver a particular guid downstream, before actually replicating
// that message. So notification to deliver a particular guid
Expand All @@ -513,10 +512,6 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,

d_state_p->stats().onEvent(mqbstat::QueueStatsDomain::EventType::e_PUT,
appData->length());

if (haveReceipt && refCount) {
d_hasNewMessages = true;
}
}
else {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
Expand All @@ -535,10 +530,6 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,
1);
}
}

// If 'FileStore::d_storageEventBuilder' is flushed, flush all relevant
// queues (call 'afterNewMessage' to deliver accumulated data)
d_state_p->storage()->dispatcherFlush(false, true);
}

void LocalQueue::onPushMessage(
Expand Down Expand Up @@ -572,8 +563,6 @@ void LocalQueue::onReceipt(const bmqt::MessageGUID& msgGUID,
// filled downstream.
qH->onAckMessage(ackMessage);
} // else the handle is gone

d_hasNewMessages = true;
}

void LocalQueue::onRemoval(const bmqt::MessageGUID& msgGUID,
Expand Down Expand Up @@ -601,12 +590,8 @@ void LocalQueue::deliverIfNeeded()
{
// Now that storage messages have been flushed, notify the engine (and thus
// any peer or downstream client) to deliver next applicable message.

if (d_hasNewMessages) {
d_hasNewMessages = false;
d_queueEngine_mp->afterNewMessage(bmqt::MessageGUID(),
static_cast<mqbi::QueueHandle*>(0));
}
d_queueEngine_mp->afterNewMessage(bmqt::MessageGUID(),
static_cast<mqbi::QueueHandle*>(0));
}

void LocalQueue::confirmMessage(const bmqt::MessageGUID& msgGUID,
Expand Down
1 change: 0 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_localqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ class LocalQueue BSLS_CPP11_FINAL {
QueueState* d_state_p;
bslma::ManagedPtr<mqbi::QueueEngine> d_queueEngine_mp;
bmqu::ThrottledActionParams d_throttledFailedPutMessages;
bool d_hasNewMessages;
bdlmt::Throttle d_throttledDuplicateMessages;
// Throttler for duplicates.
bool d_haveStrongConsistency;
Expand Down
1 change: 1 addition & 0 deletions src/groups/mqb/mqbblp/mqbblp_queueenginetester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ void QueueEngineTester::init(const mqbconfm::Domain& domainConfig,
limits.messages() = bsl::numeric_limits<bsls::Types::Int64>::max();
limits.bytes() = bsl::numeric_limits<bsls::Types::Int64>::max();

storage_p->setConsistency(domainConfig.consistency());
rc = storage_p->configure(errorDescription,
config,
limits,
Expand Down
2 changes: 2 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ int RemoteQueue::configureAsProxy(bsl::ostream& errorDescription,
limits.messages() = bsl::numeric_limits<bsls::Types::Int64>::max();
limits.bytes() = bsl::numeric_limits<bsls::Types::Int64>::max();

storageMp->setConsistency(domainCfg.consistency());
int rc = storageMp->configure(errorDescription,
config,
limits,
Expand Down Expand Up @@ -243,6 +244,7 @@ int RemoteQueue::configureAsClusterMember(bsl::ostream& errorDescription,
d_allocator_p);
}
else {
d_state_p->storage()->setConsistency(domainCfg.consistency());
rc = d_state_p->storage()->configure(
errorDescription,
domainCfg.storage().config(),
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1253,7 +1253,7 @@ void RootQueueEngine::onHandleUsable(mqbi::QueueHandle* handle,
}

// Before attempting to deliver any messages, flush the storage.
d_queueState_p->queue()->storage()->dispatcherFlush(true, false);
d_queueState_p->queue()->storage()->flushStorage();

unsigned int upstreamSubQueueId = 0;
if (d_queueState_p->routingContext().onUsable(&upstreamSubQueueId,
Expand Down
8 changes: 5 additions & 3 deletions src/groups/mqb/mqbc/mqbc_storageutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ void StorageUtil::registerQueueDispatched(
// partition, we want to make sure that queue creation record written to
// the partition above is sent to the replicas as soon as possible.

fs->dispatcherFlush(true, false);
fs->flushStorage();

BALL_LOG_INFO << clusterDescription << ": Partition [" << partitionId
<< "] registered [" << storage->queueUri() << "], queueKey ["
Expand Down Expand Up @@ -347,7 +347,7 @@ int StorageUtil::updateQueuePrimaryRaw(mqbs::ReplicatedStorage* storage,

// Flush the partition for records written above to reach replicas right
// away.
fs->dispatcherFlush(true, false);
fs->flushStorage();

bmqu::Printer<AppInfos> printer1(&addedIdKeyPairs);
bmqu::Printer<AppInfos> printer2(&removedIdKeyPairs);
Expand Down Expand Up @@ -2654,7 +2654,7 @@ void StorageUtil::unregisterQueueDispatched(
// that the partition is flushed and the QueueDeletion record reaches
// replicas.

fs->dispatcherFlush(true, false);
fs->flushStorage();
}

int StorageUtil::updateQueuePrimary(StorageSpMap* storageMap,
Expand Down Expand Up @@ -3210,6 +3210,8 @@ int StorageUtil::makeStorage(bsl::ostream& errorDescription,
// Configure the storage. Note that if a queue calls 'makeStorage' twice,
// its storage will be configured twice as things are currently.

// Do not change consistency level of `storageSp`, use the one provided on
// construction instead.
const int rc = storageSp->configure(errorDescription,
storageDef.config(),
storageDef.queueLimits(),
Expand Down
12 changes: 7 additions & 5 deletions src/groups/mqb/mqbi/mqbi_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ class Storage {
const bsls::Types::Int64 messageTtl,
const int maxDeliveryAttempts) = 0;

/// Set the consistency level associated to this storage to the specified
/// `value`.
virtual void setConsistency(const mqbconfm::Consistency& value) = 0;

virtual void setQueue(mqbi::Queue* queue) = 0;

/// Close this storage.
Expand Down Expand Up @@ -503,11 +507,9 @@ class Storage {
/// * e_APPKEY_NOT_FOUND : Invalid `appKey` specified
virtual StorageResult::Enum removeAll(const mqbu::StorageKey& appKey) = 0;

/// If the specified `storage` is `true`, flush any buffered replication
/// messages to the peers. If the specified `queues` is `true`, `flush`
/// all associated queues. Behavior is undefined unless this node is
/// the primary for this partition.
virtual void dispatcherFlush(bool storage, bool queues) = 0;
/// Flush any buffered replication messages to the peers. Behaviour is
/// undefined unless this cluster node is the primary for this partition.
virtual void flushStorage() = 0;

/// Return the resource capacity meter associated to this storage.
virtual mqbu::CapacityMeter* capacityMeter() = 0;
Expand Down
8 changes: 5 additions & 3 deletions src/groups/mqb/mqbs/mqbs_datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -684,9 +684,11 @@ class DataStore : public mqbi::DispatcherClient {

/// If the specified `storage` is `true`, flush any buffered replication
/// messages to the peers. If the specified `queues` is `true`, `flush`
/// all associated queues. Behavior is undefined unless this node is
/// the primary for this partition.
virtual void dispatcherFlush(bool storage, bool queues) = 0;
/// all associated queues.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@678098 should this comment from 685-687 be removed? Looks like flushStorage() docs don't need this bit anymore

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will clean this up


/// Flush any buffered replication messages to the peers. Behaviour is
/// undefined unless this cluster node is the primary for this partition.
virtual void flushStorage() = 0;

// ACCESSORS

Expand Down
16 changes: 12 additions & 4 deletions src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,14 @@ int FileBackedStorage::configure(
return 0;
}

void FileBackedStorage::setConsistency(const mqbconfm::Consistency& value)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(value.isEventualValue() || value.isStrongValue());

d_hasReceipts = value.isEventualValue();
}

void FileBackedStorage::setQueue(mqbi::Queue* queue)
{
d_virtualStorageCatalog.setQueue(queue);
Expand Down Expand Up @@ -580,7 +588,7 @@ FileBackedStorage::removeAll(const mqbu::StorageKey& appKey)

if (appKey.isNull()) {
purgeCommon(appKey); // or 'mqbu::StorageKey::k_NULL_KEY'
dispatcherFlush(true, false);
flushStorage();
d_isEmpty.storeRelaxed(1);

return mqbi::StorageResult::e_SUCCESS; // RETURN
Expand All @@ -591,7 +599,7 @@ FileBackedStorage::removeAll(const mqbu::StorageKey& appKey)
d_virtualStorageCatalog.removeAll(appKey);
// This will call back 'releaseRef'

dispatcherFlush(true, false);
flushStorage();

if (d_handles.empty()) {
d_isEmpty.storeRelaxed(1);
Expand All @@ -606,9 +614,9 @@ FileBackedStorage::removeAll(const mqbu::StorageKey& appKey)
return mqbi::StorageResult::e_SUCCESS;
}

void FileBackedStorage::dispatcherFlush(bool storage, bool queues)
void FileBackedStorage::flushStorage()
{
d_store_p->dispatcherFlush(storage, queues);
d_store_p->flushStorage();
}

int FileBackedStorage::gcExpiredMessages(
Expand Down
16 changes: 9 additions & 7 deletions src/groups/mqb/mqbs/mqbs_filebackedstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage {
bmqp::SchemaLearner::Context d_schemaLearnerContext;
// Context for replicated data.

const bool d_hasReceipts;
bool d_hasReceipts;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We set this flag once and didn't update it before. It is also used here for early return:

bool FileBackedStorage::hasReceipt(const bmqt::MessageGUID& msgGUID) const
{
if (d_hasReceipts) {
// Weak consistency
return true; // RETURN
}


bmqt::MessageGUID d_currentlyAutoConfirming;
// Message being evaluated and possibly auto confirmed.
Expand Down Expand Up @@ -365,6 +365,11 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage {
const bsls::Types::Int64 messageTtl,
int maxDeliveryAttempts) BSLS_KEYWORD_OVERRIDE;

/// Set the consistency level associated to this storage to the specified
/// `value`.
void
setConsistency(const mqbconfm::Consistency& value) BSLS_KEYWORD_OVERRIDE;

/// Return the resource capacity meter associated to this storage.
virtual mqbu::CapacityMeter* capacityMeter() BSLS_KEYWORD_OVERRIDE;

Expand Down Expand Up @@ -466,12 +471,9 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage {
virtual mqbi::StorageResult::Enum
removeAll(const mqbu::StorageKey& appKey) BSLS_KEYWORD_OVERRIDE;

/// If the specified `storage` is `true`, flush any buffered replication
/// messages to the peers. If the specified `queues` is `true`, `flush`
/// all associated queues. Behavior is undefined unless this node is
/// the primary for this partition.
virtual void dispatcherFlush(bool storage,
bool queues) BSLS_KEYWORD_OVERRIDE;
/// Flush any buffered replication messages to the peers. Behaviour is
/// undefined unless this cluster node is the primary for this partition.
void flushStorage() BSLS_KEYWORD_OVERRIDE;

/// Attempt to garbage-collect messages for which TTL has expired, and
/// return the number of messages garbage-collected. Populate the
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ class MockDataStore : public mqbs::DataStore {

void clearPrimary() BSLS_KEYWORD_OVERRIDE {}

void dispatcherFlush(bool, bool) BSLS_KEYWORD_OVERRIDE {}
void flushStorage() BSLS_KEYWORD_OVERRIDE {}

bool isOpen() const BSLS_KEYWORD_OVERRIDE { return true; }

Expand Down Expand Up @@ -724,7 +724,7 @@ TEST(breathingTest)
ASSERT_NE(storage.queue(), static_cast<mqbi::Queue*>(0));
// Queue has been set via call to 'setQueue'

ASSERT_PASS(storage.dispatcherFlush(true, false));
ASSERT_PASS(storage.flushStorage());
// Does nothing, at the time of this writing

ASSERT_EQ(storage.queueOpRecordHandles().empty(), true);
Expand Down
Loading
Loading