Skip to content

Commit

Permalink
now we always flush when a success is reported, and the funcction is …
Browse files Browse the repository at this point in the history
…renamed to flush
  • Loading branch information
mvdwerve committed Jun 19, 2019
1 parent c82fce8 commit 7b79b7c
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 22 deletions.
27 changes: 7 additions & 20 deletions include/amqpcpp/channelimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -574,10 +574,10 @@ class ChannelImpl : public Watchable, public std::enable_shared_from_this<Channe
}

/**
* Signal the channel that a synchronous operation was completed.
* After this operation, waiting frames can be sent out.
* Signal the channel that a synchronous operation was completed, and that any
* queued frames can be sent out.
*/
void onSynchronized();
void flush();

/**
* Report to the handler that the channel is opened
Expand All @@ -596,8 +596,8 @@ class ChannelImpl : public Watchable, public std::enable_shared_from_this<Channe
// inform handler
if (_readyCallback) _readyCallback();

// if the monitor is still valid, we exit synchronous mode now
if (monitor.valid()) onSynchronized();
// if the monitor is still valid, we flush any waiting operations
if (monitor.valid()) flush();
}

/**
Expand Down Expand Up @@ -644,19 +644,10 @@ class ChannelImpl : public Watchable, public std::enable_shared_from_this<Channe
{
// skip if there is no oldest callback
if (!_oldestCallback) return true;

// remember whether or not we were synchronous
bool synchronous = _synchronous;

// is the queue empty at this moment?
bool empty = _queue.empty();
// flush the queue, which will send the next operation if the current operation was synchronous
flush();

// the last (possibly synchronous) operation was received, so we're no longer in synchronous mode. this
// is an optimization that makes sure that the first instruction _after_ a synchronous instruction
// that is installed during the success callback we make later does _not_ need to be buffered, but can be
// sent directly
if (synchronous && empty) _synchronous = false;

// we are going to call callbacks that could destruct the channel
Monitor monitor(this);

Expand All @@ -670,10 +661,6 @@ class ChannelImpl : public Watchable, public std::enable_shared_from_this<Channe
// leap out if channel no longer exists
if (!monitor.valid()) return false;

// if we were synchronous, but there were still messages in the queue, we process the queue now, because the synchronous
// operation was finished, and its callback was made, which means we're no longer in synchronous mode
if (synchronous && !empty) onSynchronized();

// set the oldest callback
_oldestCallback = next;

Expand Down
2 changes: 1 addition & 1 deletion src/channelimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ bool ChannelImpl::send(const Frame &frame)
* Signal the channel that a synchronous operation was completed. After
* this operation, waiting frames can be sent out.
*/
void ChannelImpl::onSynchronized()
void ChannelImpl::flush()
{
// we are no longer waiting for synchronous operations
_synchronous = false;
Expand Down
2 changes: 1 addition & 1 deletion src/deferredget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const std::shared_ptr<Deferred> &DeferredGet::reportSuccess() const
void DeferredGet::complete()
{
// the channel is now synchronized, delayed frames may now be sent
_channel->onSynchronized();
_channel->flush();

// pass on to normal implementation
DeferredExtReceiver::complete();
Expand Down

0 comments on commit 7b79b7c

Please sign in to comment.