Skip to content

Commit

Permalink
Merge pull request #298 from CopernicaMarketingSoftware/synchronous-fix
Browse files Browse the repository at this point in the history
fix for incorrect handling of synchronous flag
  • Loading branch information
EmielBruijntjes authored Jun 19, 2019
2 parents f107b4a + 7b79b7c commit 5a648fe
Show file tree
Hide file tree
Showing 22 changed files with 29 additions and 29 deletions.
16 changes: 8 additions & 8 deletions include/amqpcpp/channelimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -574,10 +574,10 @@ class ChannelImpl : public Watchable, public std::enable_shared_from_this<Channe
}

/**
* Signal the channel that a synchronous operation was completed.
* After this operation, waiting frames can be sent out.
* Signal the channel that a synchronous operation was completed, and that any
* queued frames can be sent out.
*/
void onSynchronized();
void flush();

/**
* Report to the handler that the channel is opened
Expand All @@ -596,8 +596,8 @@ class ChannelImpl : public Watchable, public std::enable_shared_from_this<Channe
// inform handler
if (_readyCallback) _readyCallback();

// if the monitor is still valid, we exit synchronous mode now
if (monitor.valid()) onSynchronized();
// if the monitor is still valid, we flush any waiting operations
if (monitor.valid()) flush();
}

/**
Expand Down Expand Up @@ -644,10 +644,10 @@ class ChannelImpl : public Watchable, public std::enable_shared_from_this<Channe
{
// skip if there is no oldest callback
if (!_oldestCallback) return true;

// the last (possibly synchronous) operation was received, so we're no longer in synchronous mode
if (_synchronous && _queue.empty()) _synchronous = false;

// flush the queue, which will send the next operation if the current operation was synchronous
flush();

// we are going to call callbacks that could destruct the channel
Monitor monitor(this);

Expand Down
2 changes: 1 addition & 1 deletion src/basiccancelokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class BasicCancelOKFrame : public BasicFrame
if (!channel) return false;

// report
if (channel->reportSuccess<const std::string&>(consumerTag())) channel->onSynchronized();
channel->reportSuccess<const std::string&>(consumerTag());

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/basicconsumeokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class BasicConsumeOKFrame : public BasicFrame
if (!channel) return false;

// report
if (channel->reportSuccess(consumerTag())) channel->onSynchronized();
channel->reportSuccess(consumerTag());

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/basicgetemptyframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class BasicGetEmptyFrame : public BasicFrame
if (!channel) return false;

// report
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/basicqosokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class BasicQosOKFrame : public BasicFrame
if (!channel) return false;

// report
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/channelcloseokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class ChannelCloseOKFrame : public ChannelFrame
if (!channel) return false;

// report that the channel is closed
if (channel->reportClosed()) channel->onSynchronized();
channel->reportClosed();

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/channelflowokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class ChannelFlowOKFrame : public ChannelFrame
if (!channel) return false;

// report success for the call
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/channelimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ bool ChannelImpl::send(const Frame &frame)
* Signal the channel that a synchronous operation was completed. After
* this operation, waiting frames can be sent out.
*/
void ChannelImpl::onSynchronized()
void ChannelImpl::flush()
{
// we are no longer waiting for synchronous operations
_synchronous = false;
Expand Down
2 changes: 1 addition & 1 deletion src/confirmselectokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class ConfirmSelectOKFrame : public ConfirmFrame
if(!channel) return false;

// report that the channel is open
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/deferredget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const std::shared_ptr<Deferred> &DeferredGet::reportSuccess() const
void DeferredGet::complete()
{
// the channel is now synchronized, delayed frames may now be sent
_channel->onSynchronized();
_channel->flush();

// pass on to normal implementation
DeferredExtReceiver::complete();
Expand Down
2 changes: 1 addition & 1 deletion src/exchangebindokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ExchangeBindOKFrame : public ExchangeFrame
if(!channel) return false;

// report to handler
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/exchangedeclareokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class ExchangeDeclareOKFrame : public ExchangeFrame
if(!channel) return false;

// report exchange declare ok
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/exchangedeleteokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class ExchangeDeleteOKFrame : public ExchangeFrame
if(!channel) return false;

// report to handler
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/exchangeunbindokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class ExchangeUnbindOKFrame : public ExchangeFrame
if(!channel) return false;

// report to handler
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/queuebindokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class QueueBindOKFrame : public QueueFrame
if(!channel) return false;

// report to handler
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/queuedeclareokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class QueueDeclareOKFrame : public QueueFrame
if (!channel) return false;

// report success
if (channel->reportSuccess(name(), messageCount(), consumerCount())) channel->onSynchronized();
channel->reportSuccess(name(), messageCount(), consumerCount());

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/queuedeleteokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class QueueDeleteOKFrame : public QueueFrame
if(!channel) return false;

// report queue deletion success
if (channel->reportSuccess(this->messageCount())) channel->onSynchronized();
channel->reportSuccess(this->messageCount());

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/queuepurgeokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class QueuePurgeOKFrame : public QueueFrame
if(!channel) return false;

// report queue purge success
if (channel->reportSuccess(this->messageCount())) channel->onSynchronized();
channel->reportSuccess(this->messageCount());

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/queueunbindokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class QueueUnbindOKFrame : public QueueFrame
if(!channel) return false;

// report queue unbind success
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/transactioncommitokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class TransactionCommitOKFrame : public TransactionFrame
if(!channel) return false;

// report that the channel is open
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/transactionrollbackokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class TransactionRollbackOKFrame : public TransactionFrame
if(!channel) return false;

// report that the channel is open
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();

// done
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/transactionselectokframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class TransactionSelectOKFrame : public TransactionFrame
if(!channel) return false;

// report that the channel is open
if (channel->reportSuccess()) channel->onSynchronized();
channel->reportSuccess();

// done
return true;
Expand Down

0 comments on commit 5a648fe

Please sign in to comment.