From 7aee9332253ac87cc4144fef2d83a007eec809cb Mon Sep 17 00:00:00 2001 From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> Date: Thu, 19 Dec 2024 09:24:33 -0500 Subject: [PATCH] addressing review Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --- src/groups/mqb/mqbblp/mqbblp_pushstream.h | 20 +++++++++++-------- .../mqb/mqbblp/mqbblp_relayqueueengine.cpp | 8 +++++++- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_pushstream.h b/src/groups/mqb/mqbblp/mqbblp_pushstream.h index 60c966913..eb1c6854f 100644 --- a/src/groups/mqb/mqbblp/mqbblp_pushstream.h +++ b/src/groups/mqb/mqbblp/mqbblp_pushstream.h @@ -127,8 +127,12 @@ struct PushStream { void add(Element* element); void remove(Element* element); - /// Return 'true' - bool setLastPush(const bmqt::MessageGUID& lastGUID); + /// Return 'true' if the specified `guid` is the same as in the last + /// `setLastPush` call. + bool isLastPush(const bmqt::MessageGUID& guid); + + /// Cache the specified `guid` for subsequent checks by `isLastPush`. + void setLastPush(const bmqt::MessageGUID& guid); const Element* last() const; }; @@ -620,14 +624,14 @@ inline void PushStream::App::remove(Element* element) d_elements.remove(element, e_APP); } -inline bool PushStream::App::setLastPush(const bmqt::MessageGUID& lastGUID) +inline bool PushStream::App::isLastPush(const bmqt::MessageGUID& lastGUID) { - if (d_lastGUID == lastGUID) { - return false; - } - d_lastGUID = lastGUID; + return d_lastGUID == lastGUID; +} - return true; +inline void PushStream::App::setLastPush(const bmqt::MessageGUID& lastGUID) +{ + d_lastGUID = lastGUID; } inline const PushStream::Element* PushStream::App::last() const diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp index b1e1581a6..52b481d46 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp @@ -603,7 +603,13 @@ void RelayQueueEngine::deliverMessages() d_storageIter_mp->removeCurrentElement(); } - else if (element->app().setLastPush(d_storageIter_mp->guid())) { + else if (element->app().isLastPush(d_storageIter_mp->guid())) { + // This `app` has already seen this message. + d_storageIter_mp->removeCurrentElement(); + } + else { + element->app().setLastPush(d_storageIter_mp->guid()); + if (d_appsDeliveryContext.processApp(*app, i)) { // The current element has made it either to delivery or // putAside and it can be removed