Skip to content

Commit

Permalink
addressing review
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Dec 19, 2024
1 parent a6dcd0d commit 7aee933
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
20 changes: 12 additions & 8 deletions src/groups/mqb/mqbblp/mqbblp_pushstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,12 @@ struct PushStream {
void add(Element* element);
void remove(Element* element);

/// Return 'true'
bool setLastPush(const bmqt::MessageGUID& lastGUID);
/// Return 'true' if the specified `guid` is the same as in the last
/// `setLastPush` call.
bool isLastPush(const bmqt::MessageGUID& guid);

/// Cache the specified `guid` for subsequent checks by `isLastPush`.
void setLastPush(const bmqt::MessageGUID& guid);

const Element* last() const;
};
Expand Down Expand Up @@ -620,14 +624,14 @@ inline void PushStream::App::remove(Element* element)
d_elements.remove(element, e_APP);
}

inline bool PushStream::App::setLastPush(const bmqt::MessageGUID& lastGUID)
inline bool PushStream::App::isLastPush(const bmqt::MessageGUID& lastGUID)
{
if (d_lastGUID == lastGUID) {
return false;
}
d_lastGUID = lastGUID;
return d_lastGUID == lastGUID;
}

return true;
inline void PushStream::App::setLastPush(const bmqt::MessageGUID& lastGUID)
{
d_lastGUID = lastGUID;
}

inline const PushStream::Element* PushStream::App::last() const
Expand Down
8 changes: 7 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,13 @@ void RelayQueueEngine::deliverMessages()

d_storageIter_mp->removeCurrentElement();
}
else if (element->app().setLastPush(d_storageIter_mp->guid())) {
else if (element->app().isLastPush(d_storageIter_mp->guid())) {
// This `app` has already seen this message.
d_storageIter_mp->removeCurrentElement();
}
else {
element->app().setLastPush(d_storageIter_mp->guid());

if (d_appsDeliveryContext.processApp(*app, i)) {
// The current element has made it either to delivery or
// putAside and it can be removed
Expand Down

0 comments on commit 7aee933

Please sign in to comment.