Skip to content

Commit

Permalink
fix PushStream & VirtualStorage ordinal (#528)
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo authored Nov 26, 2024
1 parent 67abd63 commit a5f8a06
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 74 deletions.
10 changes: 4 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_pushstream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,8 @@ void PushStreamIterator::removeCurrentElement()
d_currentElement = d_currentElement->next();
++d_currentOrdinal;

d_owner_p->remove(del);
d_owner_p->destroy(del, true);
// doKeepGuid because of the d_iterator
d_owner_p->remove(del, false);
// cannot erase the GUID because of the d_iterator

if (d_iterator->second.numElements() == 0) {
BSLS_ASSERT_SAFE(d_currentElement == 0);
Expand Down Expand Up @@ -291,9 +290,8 @@ bool VirtualPushStreamIterator::advance()

d_currentElement = d_currentElement->nextInApp();

d_owner_p->remove(del);
d_owner_p->destroy(del, false);
// do not keep Guid
d_owner_p->remove(del, true);
// can erase GUID

if (atEnd()) {
return false;
Expand Down
44 changes: 19 additions & 25 deletions src/groups/mqb/mqbblp/mqbblp_pushstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,11 @@ struct PushStream {
void add(Element* element);

/// Remove the specified `element` from both GUID and App corresponding to
/// the `element` (and specified when constructing the `element`).
/// Return the number of remaining Elements in the corresponding GUID.
unsigned int remove(Element* element);
/// the `element` (and specified when constructing the `element`). If
/// there are no more elements in the App, erase the App. If the specified
/// `canEraseGuid` is `true` and there are no more elements in the GUID,
/// erase the GUID.
void remove(Element* element, bool canEraseGuid);

/// Remove all PushStream Elements corresponding to the specified
/// `upstreamSubQueueId`. Erase each corresponding GUIDs from the
Expand All @@ -218,9 +220,6 @@ struct PushStream {
Element* create(const bmqp::SubQueueInfo& info,
const iterator& iterator,
const Apps::iterator& iteratorApp);

/// Destroy the specified `element`
void destroy(Element* element, bool doKeepGuid);
};

// ========================
Expand Down Expand Up @@ -605,6 +604,7 @@ inline void PushStream::App::add(Element* element)
{
d_elements.add(element, e_APP);
}

inline void PushStream::App::remove(Element* element)
{
d_elements.remove(element, e_APP);
Expand All @@ -631,19 +631,6 @@ PushStream::create(const bmqp::SubQueueInfo& subscription,
return element;
}

inline void PushStream::destroy(Element* element, bool doKeepGuid)
{
if (element->app().d_elements.numElements() == 0) {
element->eraseApp(d_apps);
}

if (!doKeepGuid && element->guid().numElements() == 0) {
element->eraseGuid(d_stream);
}

d_pushElementsPool_sp->deallocate(element);
}

inline PushStream::iterator
PushStream::findOrAppendMessage(const bmqt::MessageGUID& guid)
{
Expand All @@ -662,7 +649,7 @@ inline void PushStream::add(Element* element)
element->app().add(element);
}

inline unsigned int PushStream::remove(Element* element)
inline void PushStream::remove(Element* element, bool canEraseGuid)
{
BSLS_ASSERT_SAFE(element);
BSLS_ASSERT_SAFE(!element->equal(d_stream.end()));
Expand All @@ -673,7 +660,15 @@ inline unsigned int PushStream::remove(Element* element)
// remove from the guid
element->guid().remove(element, e_GUID);

return element->guid().numElements();
if (element->app().d_elements.numElements() == 0) {
element->eraseApp(d_apps);
}

if (canEraseGuid && element->guid().numElements() == 0) {
element->eraseGuid(d_stream);
}

d_pushElementsPool_sp->deallocate(element);
}

inline unsigned int PushStream::removeApp(unsigned int upstreamSubQueueId)
Expand All @@ -695,10 +690,9 @@ inline unsigned int PushStream::removeApp(Apps::iterator itApp)
for (unsigned int count = 0; count < numElements; ++count) {
Element* element = itApp->second.d_elements.front();

remove(element);

destroy(element, false);
// do not keep Guid
remove(element, true);
// do not keep Guid. This relies on either 'beforeOneAppRemoved' or
// resetting iterator(s).
}

return numElements;
Expand Down
9 changes: 3 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_pushstream.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ static void test1_basic()
itApp);

ps.add(element);
ps.remove(element);
ps.destroy(element, false);
ps.remove(element, true);
}

static void test2_iterations()
Expand Down Expand Up @@ -183,10 +182,8 @@ static void test2_iterations()
ASSERT(vit.atEnd());
}

ps.remove(element2);
ps.destroy(element2, false);
ps.remove(element3);
ps.destroy(element3, false);
ps.remove(element2, true);
ps.remove(element3, true);
}

// ============================================================================
Expand Down
42 changes: 25 additions & 17 deletions src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,8 @@ QueueEngineUtil_AppsDeliveryContext::QueueEngineUtil_AppsDeliveryContext(
mqbi::Queue* queue,
bslma::Allocator* allocator)
: d_consumers(allocator)
, d_isReady(false)
, d_numApps(0)
, d_numStops(0)
, d_currentMessage(0)
, d_queue_p(queue)
, d_timeDelta()
Expand All @@ -652,25 +653,26 @@ QueueEngineUtil_AppsDeliveryContext::QueueEngineUtil_AppsDeliveryContext(
BSLS_ASSERT_SAFE(queue);
}

void QueueEngineUtil_AppsDeliveryContext::start()
{
d_isReady = true;
}

bool QueueEngineUtil_AppsDeliveryContext::reset(
mqbi::StorageIterator* currentMessage)
{
d_consumers.clear();
d_timeDelta.reset();

if (!d_isReady) {
return false; // RETURN
bool result = false;

if (haveProgress() && currentMessage && currentMessage->hasReceipt()) {
d_currentMessage = currentMessage;
result = true;
}
else {
d_currentMessage = 0;
}

d_currentMessage = currentMessage;
d_isReady = false;
d_numApps = 0;
d_numStops = 0;

return d_currentMessage ? d_currentMessage->hasReceipt() : false;
return result;
}

bool QueueEngineUtil_AppsDeliveryContext::processApp(
Expand All @@ -679,12 +681,12 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp(
{
BSLS_ASSERT_SAFE(d_currentMessage->hasReceipt());

++d_numApps;

if (d_queue_p->isDeliverAll()) {
// collect all handles
app.routing()->iterateConsumers(d_broadcastVisitor, d_currentMessage);

d_isReady = true;

// Broadcast does not need stats nor any special per-message treatment.
return false; // RETURN
}
Expand All @@ -695,6 +697,7 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp(
// The queue iterator can advance leaving the 'app' behind.
app.setResumePoint(d_currentMessage->guid());
}
++d_numStops;
// else the existing resumePoint is earlier (if authorized)
return false; // RETURN
}
Expand All @@ -703,7 +706,6 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp(
ordinal);

if (!appView.isNew()) {
d_isReady = true;
return true; // RETURN
}

Expand Down Expand Up @@ -736,7 +738,9 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp(

// Early return.
// If all Apps return 'e_NO_CAPACITY_ALL', stop the iteration
// (d_isReady == false).
// (d_numApps == 0).

++d_numStops;

return false; // RETURN
}
Expand All @@ -750,7 +754,6 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp(
}

// Still making progress (result != Routers::e_NO_CAPACITY_ALL)
d_isReady = true;

return (result == Routers::e_SUCCESS);
}
Expand Down Expand Up @@ -811,7 +814,7 @@ void QueueEngineUtil_AppsDeliveryContext::deliverMessage()
}
}

if (d_isReady) {
if (haveProgress()) {
d_currentMessage->advance();
}

Expand All @@ -823,6 +826,11 @@ bool QueueEngineUtil_AppsDeliveryContext::isEmpty() const
return d_consumers.empty();
}

bool QueueEngineUtil_AppsDeliveryContext::haveProgress() const
{
return (d_numStops < d_numApps || d_numApps == 0);
}

bsls::Types::Int64 QueueEngineUtil_AppsDeliveryContext::timeDelta()
{
if (!d_timeDelta.has_value()) {
Expand Down
9 changes: 5 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_queueengineutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,8 @@ struct QueueEngineUtil_AppsDeliveryContext {

private:
Consumers d_consumers;
bool d_isReady;
int d_numApps;
int d_numStops; // Apps not moving
mqbi::StorageIterator* d_currentMessage;
mqbi::Queue* d_queue_p;
bsl::optional<bsls::Types::Int64> d_timeDelta;
Expand All @@ -626,9 +627,6 @@ struct QueueEngineUtil_AppsDeliveryContext {
QueueEngineUtil_AppsDeliveryContext(mqbi::Queue* queue,
bslma::Allocator* allocator);

/// Start delivery cycle(s).
void start();

/// Prepare the context to process next message.
/// Return `true` if the delivery can continue iterating dataStream
/// The `false` return value indicates either the end of the dataStream or
Expand Down Expand Up @@ -662,6 +660,9 @@ struct QueueEngineUtil_AppsDeliveryContext {
/// Return `true` if there is at least one delivery target selected.
bool isEmpty() const;

/// Return `true` if not all Apps are at capacity or there are no Apps.
bool haveProgress() const;

bsls::Types::Int64 timeDelta();
};

Expand Down
27 changes: 16 additions & 11 deletions src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,6 @@ void RelayQueueEngine::deliverMessages()
// 1. End of storage; or
// 2. All subStreams return 'e_NO_CAPACITY_ALL'

d_appsDeliveryContext.start();

while (d_appsDeliveryContext.reset(d_storageIter_mp.get())) {
// Assume, all Apps need to deliver (some may be at capacity)
unsigned int numApps = d_storageIter_mp->numApps();
Expand All @@ -604,14 +602,13 @@ void RelayQueueEngine::deliverMessages()

d_storageIter_mp->removeCurrentElement();
}

if (d_appsDeliveryContext.processApp(*app, i)) {
else if (d_appsDeliveryContext.processApp(*app, i)) {
// The current element has made it either to delivery or
// putAside or resumerPoint and it can be removed
// putAside and it can be removed
d_storageIter_mp->removeCurrentElement();
}
// Else, the current element has made it to resumerPoint and
// it cannot be removed
// Else, the current element has made it to resumePoint and it
// cannot be removed.
}
d_appsDeliveryContext.deliverMessage();
}
Expand Down Expand Up @@ -1919,14 +1916,22 @@ void RelayQueueEngine::storePush(mqbi::StorageMessageAttributes* attributes,
void RelayQueueEngine::beforeOneAppRemoved(unsigned int upstreamSubQueueId)
{
while (!d_storageIter_mp->atEnd()) {
if (d_storageIter_mp->numApps() > 1) {
const int numApps = d_storageIter_mp->numApps();
if (numApps > 1) {
// Removal of App's elements will not invalidate 'd_storageIter_mp'
break;
}
if (numApps == 1) {
const PushStream::Element* element = d_storageIter_mp->element(0);
if (element->app().d_app->upstreamSubQueueId() !=
upstreamSubQueueId) {
break;
}
}
else {
BSLS_ASSERT_SAFE(numApps == 0);

const PushStream::Element* element = d_storageIter_mp->element(0);
if (element->app().d_app->upstreamSubQueueId() != upstreamSubQueueId) {
break;
// The case when 'advance' does not follow 'removeCurrentElement'
}

d_storageIter_mp->advance();
Expand Down
1 change: 0 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1276,7 +1276,6 @@ void RootQueueEngine::afterNewMessage(
d_queueState_p->queue()));

// Deliver new messages to active (alive and capable to deliver) consumers
d_appsDeliveryContext.start();

while (d_appsDeliveryContext.reset(d_storageIter_mp.get())) {
// Assume, all Apps need to deliver (some may be at capacity)
Expand Down
7 changes: 4 additions & 3 deletions src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,10 @@ int VirtualStorageCatalog::addVirtualStorage(bsl::ostream& errorDescription,
appOrdinal = d_nextOrdinal++;
}
else {
appOrdinal = d_availableOrdinals.front();
AvailableOrdinals::const_iterator first = d_availableOrdinals.cbegin();
appOrdinal = *first;
// There is no conflict because everything 'appOrdinal' was removed.
d_availableOrdinals.pop_front();
d_availableOrdinals.erase(first);
}

BSLS_ASSERT_SAFE(appOrdinal <= d_virtualStorages.size());
Expand Down Expand Up @@ -468,7 +469,7 @@ bool VirtualStorageCatalog::removeVirtualStorage(
removeAll(appKey);

const VirtualStorage& vs = *it->value();
d_availableOrdinals.push_back(vs.ordinal());
d_availableOrdinals.insert(vs.ordinal());

if (d_queue_p) {
BSLS_ASSERT_SAFE(d_queue_p->queueEngine());
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class VirtualStorageCatalog {
typedef bsl::shared_ptr<VirtualStorage> VirtualStorageSp;

/// List of available ordinal values for Virtual Storages.
typedef bsl::list<Ordinal> AvailableOrdinals;
typedef bsl::set<Ordinal> AvailableOrdinals;

/// appKey -> virtualStorage
typedef bmqc::
Expand Down

0 comments on commit a5f8a06

Please sign in to comment.