Skip to content

Commit

Permalink
fix: protecting RemoteQueue::expirePendingMessagesDispatched
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Dec 19, 2024
1 parent 5a6670d commit b37ab87
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
15 changes: 11 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <bmqu_blob.h>
#include <bmqu_memoutstream.h>
#include <bmqu_printutil.h>
#include <bmqu_weakmemfn.h>

// BDE
#include <ball_severity.h>
Expand Down Expand Up @@ -454,7 +455,8 @@ RemoteQueue::RemoteQueue(QueueState* state,
int ackWindowSize,
StateSpPool* statePool,
bslma::Allocator* allocator)
: d_state_p(state)
: d_self(this, allocator)
, d_state_p(state)
, d_queueEngine_mp(0)
, d_pendingMessages(allocator)
, d_pendingConfirms(allocator)
Expand Down Expand Up @@ -553,6 +555,7 @@ void RemoteQueue::resetState()

erasePendingMessages(d_pendingMessages.end());

d_self.invalidate();
if (d_pendingMessagesTimerEventHandle) {
scheduler()->cancelEventAndWait(&d_pendingMessagesTimerEventHandle);
// 'expirePendingMessagesDispatched' does not restart timer if
Expand Down Expand Up @@ -585,6 +588,7 @@ void RemoteQueue::close()

BSLS_ASSERT_SAFE(d_pendingMessages.size() == 0);

d_self.invalidate();
if (d_pendingMessagesTimerEventHandle) {
scheduler()->cancelEventAndWait(&d_pendingMessagesTimerEventHandle);
// 'expirePendingMessagesDispatched' does not restart timer if
Expand Down Expand Up @@ -970,8 +974,9 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn,
scheduler()->scheduleEvent(
&d_pendingMessagesTimerEventHandle,
time,
bdlf::BindUtil::bind(&RemoteQueue::expirePendingMessages,
this));
bdlf::BindUtil::bind(bmqu::WeakMemFnUtil::weakMemFn(
&RemoteQueue::expirePendingMessages,
d_self.acquireWeak())));
}
}

Expand Down Expand Up @@ -1355,7 +1360,9 @@ void RemoteQueue::expirePendingMessagesDispatched()
scheduler()->scheduleEvent(
&d_pendingMessagesTimerEventHandle,
time,
bdlf::BindUtil::bind(&RemoteQueue::expirePendingMessages, this));
bdlf::BindUtil::bind(bmqu::WeakMemFnUtil::weakMemFn(
&RemoteQueue::expirePendingMessages,
d_self.acquireWeak())));

BALL_LOG_DEBUG << d_state_p->uri() << ": will check again to expire"
<< " pending PUSH messages in "
Expand Down
3 changes: 3 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_remotequeue.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <bmqp_protocol.h>

#include <bmqc_orderedhashmap.h>
#include <bmqu_sharedresource.h>

// BDE
#include <ball_log.h>
Expand Down Expand Up @@ -184,6 +185,8 @@ class RemoteQueue {

private:
// DATA
bmqu::SharedResource<RemoteQueue> d_self;

QueueState* d_state_p;

bslma::ManagedPtr<RelayQueueEngine> d_queueEngine_mp;
Expand Down

0 comments on commit b37ab87

Please sign in to comment.