-
Notifications
You must be signed in to change notification settings - Fork 140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Perf[MQB]: fix linear iteration over all queues on any replication #525
Perf[MQB]: fix linear iteration over all queues on any replication #525
Conversation
Signed-off-by: Evgeny Malygin <[email protected]>
e4b086b
to
0e2382d
Compare
Signed-off-by: Evgeny Malygin <[email protected]>
Signed-off-by: Evgeny Malygin <[email protected]>
Signed-off-by: Evgeny Malygin <[email protected]>
Signed-off-by: Evgeny Malygin <[email protected]>
Signed-off-by: Evgeny Malygin <[email protected]>
Signed-off-by: Evgeny Malygin <[email protected]>
@@ -203,7 +203,7 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { | |||
bmqp::SchemaLearner::Context d_schemaLearnerContext; | |||
// Context for replicated data. | |||
|
|||
const bool d_hasReceipts; | |||
bool d_hasReceipts; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We set this flag once and didn't update it before. It is also used here for early return:
blazingmq/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp
Lines 216 to 221 in c0f8832
bool FileBackedStorage::hasReceipt(const bmqt::MessageGUID& msgGUID) const | |
{ | |
if (d_hasReceipts) { | |
// Weak consistency | |
return true; // RETURN | |
} |
src/groups/mqb/mqbi/mqbi_storage.h
Outdated
@@ -408,6 +408,10 @@ class Storage { | |||
const bsls::Types::Int64 messageTtl, | |||
const int maxDeliveryAttempts) = 0; | |||
|
|||
/// Set the consistency level associated to this storage to the specified | |||
/// `value`. | |||
virtual bool setConsistency(const mqbconfm::Consistency& value) = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to introduce a separate setter instead of extending configure
, because sometimes we call configure
without setting new consistency.
Signed-off-by: Evgeny Malygin <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build 385 of commit 4fc5276 has completed with FAILURE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, we just need to revisit cancelReplicationNotification
logic
d_replicationNotifications.begin(); | ||
it != d_replicationNotifications.end(); | ||
it++) { | ||
StoragesMap::iterator storageIt = d_storages.find(*it); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some possibility to avoid this second lookup but then we need to rely on storage being empty when we remove it. Maybe, we can just add a comment now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment, let's revisit it later
ReplicatedStorage* rs = it->second; | ||
if (rs->queue()) { | ||
rs->queue()->onReplicatedBatch(); | ||
// good time to flush weak consistency queues. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This overlaps with the code in dispatcherFlush
. Maybe, we can refactor a bit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed code duplication and also splitted dispatcherFlush
-> flushStorage
, flushQueues
src/groups/mqb/mqbs/mqbs_filestore.h
Outdated
/// Cancel replication notification (if any) for the specified `queueKey`. | ||
/// No effect if `queueKey` is not found in this storage or if there are no | ||
/// current registered notifications for this queue. | ||
void cancelReplicationNotification(const mqbu::StorageKey& queueKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we apply new config, the old messages may keep their logic. For example, DataStoreRecord::d_hasReceipt
is always true
, if at the time of message creation, the consistency was weak and false
otherwise, until we get the receipt. And generating receipts should be controlled by header flags (not the configuration)
In this light, is there a possibility to simplify and get by without cancelReplicationNotification
? The notification of "in-transit" messages will treat then as "weak" which is correct. As I understand, those messages will never get receipts.
The statement about old messages keeping their logic has one flaw when reconfiguring from strong to weak. That is because of the early return in
bool FileBackedStorage::hasReceipt(const bmqt::MessageGUID& msgGUID) const
{
if (d_hasReceipts) {
// Weak consistency
return true; // RETURN
}
We have options here
- Remove the early return
- Accept the fact that "strong to weak" change applies to existing messages.
- anything else?
I thing either is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This early return is helpful because it prevents us from doing this lookup a few lines below:
RecordHandleMap::const_iterator it = d_handles.find(msgGUID);
I think it's nice for performance that we don't access OrderedHashMapWithHistory
here. Option (2) is fine by me. We do have a thread race anyway between reconfigure and incoming messages.
I also removed cancelReplicationNotification
to keep things simple
Signed-off-by: Evgeny Malygin <[email protected]>
Signed-off-by: Evgeny Malygin <[email protected]>
Signed-off-by: Evgeny Malygin <[email protected]>
Signed-off-by: Evgeny Malygin <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build 391 of commit f7be85e has completed with FAILURE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is getting better and better.
One more concern about flushQueue
API.
src/groups/mqb/mqbi/mqbi_storage.h
Outdated
/// undefined unless this cluster node is the primary for this partition. | ||
virtual void flushStorage() = 0; | ||
|
||
/// Flush all associated weak consistency queues. Behaviour is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we flush only if d_storageEventBuilder
is empty. See another comment about flushQueues
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the comment
@@ -538,7 +533,7 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader, | |||
|
|||
// If 'FileStore::d_storageEventBuilder' is flushed, flush all relevant | |||
// queues (call 'afterNewMessage' to deliver accumulated data) | |||
d_state_p->storage()->dispatcherFlush(false, true); | |||
d_state_p->storage()->flushQueues(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This (already existing) logic is a bit unclear. Please, check the following reasoning:
LocalQueue::postMessage
callsput
which adds to the builder in theFileStore
.FileStore
tracks the builder size and doesFileStore::flushIfNeeded
which used to callnotifyQueuesOnReplicatedBatch
- Then, why do we need this call
dispatcherFlush(false, true)
/flushQueues()
here?
If we don't, then there is no need to have flushQueues
as an interface method. It is entirely private business of FileStore
(and something like notifyQueuesOnReplicatedBatch
seems like a better name)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct, this call is unnecessary. Removed & simplified
Signed-off-by: Evgeny Malygin <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build 392 of commit 7a4de87 has completed with FAILURE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you!
/// all associated queues. Behavior is undefined unless this node is | ||
/// the primary for this partition. | ||
virtual void dispatcherFlush(bool storage, bool queues) = 0; | ||
/// all associated queues. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@678098 should this comment from 685-687 be removed? Looks like flushStorage()
docs don't need this bit anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will clean this up
d_hasNewMessages
flag frommqbblp::LocalQueue
.FileBackedStorage::d_hasReceipts
flag on domain reconfigure when consistency changes.