Skip to content

Commit

Permalink
Perf[BMQ,MQB]: stable GC history preventing allocations
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 committed Nov 3, 2024
1 parent 198843c commit 1196b50
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 21 deletions.
8 changes: 8 additions & 0 deletions src/groups/bmq/bmqc/bmqc_orderedhashmapwithhistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,13 @@
namespace BloombergLP {
namespace bmqc {

// -------------------------------------------
// struct OrderedHashMapWithHistory_ImpDetails
// -------------------------------------------

const int
OrderedHashMapWithHistory_ImpDetails::k_INSERT_GC_MESSAGES_BATCH_SIZE =
1000;

} // close package namespace
} // close enterprise namespace
60 changes: 55 additions & 5 deletions src/groups/bmq/bmqc/bmqc_orderedhashmapwithhistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@
namespace BloombergLP {
namespace bmqc {

// ===========================================
// struct OrderedHashMapWithHistory_ImpDetails
// ===========================================

/// PRIVATE CLASS.
// For use only by `bmqc::OrderedHashMapWithHistory` implementation.
struct OrderedHashMapWithHistory_ImpDetails {
// PRIVATE CLASS DATA
/// How many messages to GC when GC required in
/// `bmqc::OrderedHashMapWithHistory::insert`
static const int k_INSERT_GC_MESSAGES_BATCH_SIZE;
};

// ========================================
// class OrderedHashMapWithHistory_Iterator
// ========================================
Expand Down Expand Up @@ -217,7 +230,27 @@ class OrderedHashMapWithHistory {

size_t d_historySize; // how many historical (!d_isLive) items

gc_iterator d_gcIt; // where to start 'gc'
/// Whether this container has more elements to `gc`. This flag might be
/// set or unset during every `gc` call according to this container's
/// needs.
bool d_requireGC;

/// The `now` time of the last GC. We assume that the current actual time
/// is no less than this timestamp.
TimeType d_lastGCTime;

/// The iterator pointing to the element where garbage collection should
/// continue once `gc` is called. According to contract, this iterator
/// only goes forward. All the elements passed by this iterator are either
/// removed or marked for removal, depending on what happened first:
/// - If the element was not erased by the user before, but its timeout
/// happened in this container, it is marked for deletion in `gc` and
/// iterator goes forward. Next, it is the user's responsibility to call
/// `erase` on this element to fully remove it.
/// - If the user removes the element before its timeout happened, the
/// element becomes `not alive`, but still lives in the history.
/// Eventually `gc` reaches this element and fully removes it.
gc_iterator d_gcIt;

// PRIVATE CLASS METHODS
static const KEY& get_key(const bsl::pair<const KEY, VALUE>& value)
Expand Down Expand Up @@ -486,6 +519,8 @@ inline OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::
, d_first(d_impl.end())
, d_last(d_impl.end())
, d_historySize(0)
, d_requireGC(false)
, d_lastGCTime(0)
, d_gcIt(endGc())
{
// NOTHING
Expand Down Expand Up @@ -515,6 +550,8 @@ inline void OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::clear()

d_first = d_last = end();
d_gcIt = endGc();
d_requireGC = false;
d_lastGCTime = 0;
d_historySize = 0;
}

Expand Down Expand Up @@ -617,6 +654,12 @@ OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::insert(
const SOURCE_TYPE& value,
TimeType timePoint)
{
if (d_requireGC) {
gc(bsl::max(timePoint, d_lastGCTime),
OrderedHashMapWithHistory_ImpDetails::
k_INSERT_GC_MESSAGES_BATCH_SIZE);
}

bsl::pair<gc_iterator, bool> result = d_impl.insert(
Value(value, d_timeout ? timePoint + d_timeout : 0));
// No need to keep track of element's timePoint if the map is not
Expand Down Expand Up @@ -652,7 +695,9 @@ OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::gc(TimeType now,
// 'erase' can set the iterator back to erase item if its expiration time
// is sooner than the current one.

if (d_gcIt == endGc()) {
d_lastGCTime = now;
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(d_gcIt == endGc())) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
d_gcIt = beginGc();
}

Expand All @@ -664,23 +709,28 @@ OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::gc(TimeType now,
}
gc_iterator it = d_gcIt++;
if (it->d_isLive) {
// This is an old item. It should be removed by 'erase'.
// No need to return to this item. No need to check time again.
// Indicate that it needs to be removed by setting its time to 0.
// This item was not erased by the user yet, but its timeout in
// this container happened. Mark it for deletion by setting
// `d_time` to 0, so the next time user calls `erase` on it, it
// will be fully removed.
it->d_time = 0;
}
else {
// This item was erased by the user before, and we can fully remove
// it right here.
d_impl.erase(it);
--d_historySize;
}
// Meaning, there is no need for 'd_gcIt' to step back. Only forward.

if (--batchSize == 0) {
// remember where we have stopped and resume from there next time
d_requireGC = true;
return true; // RETURN
}
}

d_requireGC = false;
return false;
}

Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ namespace BloombergLP {
namespace mqbblp {

namespace {
const int k_GC_MESSAGES_INTERVAL_SECONDS = 30;
const int k_GC_MESSAGES_INTERVAL_SECONDS = 5;

bsl::ostream& printRecoveryBanner(bsl::ostream& out,
const bsl::string& lastLineSuffix)
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbc/mqbc_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace BloombergLP {
namespace mqbc {

namespace {
const int k_GC_MESSAGES_INTERVAL_SECONDS = 30;
const int k_GC_MESSAGES_INTERVAL_SECONDS = 5;

bool isPrimaryActive(const mqbi::StorageManager_PartitionInfo pinfo)
{
Expand Down
24 changes: 10 additions & 14 deletions src/groups/mqb/mqbs/mqbs_filestore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7212,22 +7212,18 @@ void FileStore::flush()
return; // RETURN
}

const bool haveMore = gcExpiredMessages(bdlt::CurrentTime::utc());
const bool haveMoreHistory = gcHistory();
BSLA_MAYBE_UNUSED const bool haveMore = gcExpiredMessages(
bdlt::CurrentTime::utc());
BSLA_MAYBE_UNUSED const bool haveMoreHistory = gcHistory();

// This is either Idle or k_GC_MESSAGES_INTERVAL_SECONDS timeout.
// 'gcHistory' attempts to iterate all old items. If there are more of them
// than the batchSize (1000), it returns 'true'. In this case, re-enable
// flush client to call it again next Idle time.
// If it returns 'false', there is no immediate work. Wait for the
// next k_GC_MESSAGES_INTERVAL_SECONDS.

if (haveMore || haveMoreHistory) {
// Explicitly schedule 'flush()' instead of relying on idleness
dispatcher()->execute(bdlf::BindUtil::bind(&FileStore::flush, this),
this,
mqbi::DispatcherEventType::e_CALLBACK);
}
// We try to remove at most k_GC_MESSAGES_BATCH_SIZE items in history.
// If there are more items ready to remove, the container's state changes,
// so any additional `insert` operation to the container will cause
// additional GC, until all old items are removed.
// If we don't balance adding new elements to the history with GC history,
// we might lose a lot of time on allocations of new items to the history,
// as well as get OOM due to uncontrollable history size increase.
}

void FileStore::setReplicationFactor(int value)
Expand Down

0 comments on commit 1196b50

Please sign in to comment.