Replies: 2 comments 7 replies
-
RabbitMQ has already re-delivered the message to your consumer at this point. If I understand your second scenario where you have in-flight deliveries for your client to process but your client experiences an error, the simplest action is to close the channel. You don't even have to cancel the consumer as the .NET client will clean up consumer instances when the If you have further questions the best way to hash things out would be to provide some code I can clone, compile and run to see the behavior. Even better would be to make the repository available via a git hosting provider that allows pull requests so that some collaboration is possible. |
Beta Was this translation helpful? Give feedback.
-
If you need to requeue all delivered messages outstanding an ack, simply close the channel they were delivered on. You can check if a delivery is a repeated one from delivery metadata. With quorum queues, you can limit the maximum number of attempted redeliveries, too. |
Beta Was this translation helpful? Give feedback.
-
I have a pool of channels in my
dotnet
application, load balanced over several connections.I'd like to create multiple consumers, and I am using
AsyncEventingBasicConsumer
for that, eachAsyncEventingBasicConsumer
has its own channel leased from the channel pool.I'm handling the
Received
event.I understand there is a hidden "cache" of prefetched messages - for example
10
and that theReceived
event is fired for one message at a time but all 10 may be received in one go as far as rabbitmq is concerned all 10 are awaiting acknowledgement, until each one is the then processed by theReceive
handler on the consumer and Ack'd / Nack'd.Here is where my issues start (not surpisingly with additional complexity)
In order to take more control over concurrency concerns (in terms of how my application processes messages), in the
Received
event handler, I'm pushing the deserialized message (making sure to grab the byte[]'s whilst they are available within the handler as docs suggest) into my ownSystem.Threading.Channel
which acts as my own buffer. I'm going to refer to this as aSystem.Threading.Channel
as the termchannel
already refers to a rabbit mq channel. In this case I'm using an unbounded (no max limit on size), single writer, single readerSystem.Threading.Channel
- however in future scenarios I am planning to use a single writer,multiple reader
System.Threading.Channel
- where the multiple readers in my application would compete for messages from theSystem.Threading.Channel
. The latter will be for scenarios where I care about processing throughput more than preserving an ordered processing sequence.When the
AsyncEventingBasicConsumer
receives the message, within theReceived
event handler, I push my deserialized representation of the received message into theSystem.Threading.Channel
along with:-deliveryTag
BasicAck
orBasicNack
of this message on the channel later on.The reader of the
System.Threading.Channel
then reads messages asynchronously, and invokes theBasicAck
orBasicNack
callbacks depending upon if it processed the message successfully.My first questions it that I am seeing something funny behavior when the reader calls
BasicNack
(orBasicReject
) in this way. It appears to me that the messages does not return to the queue after callingBasicNack
. I am not using anyprefetch
countqos
setting so the behaviour I see with 3 messages ready on a queue, is that all 3 enter the stateready
together (i.e they are all prefetched together which I understand due to having not set a prefetch count all messages are sent together), then upon processing the first message from theSystem.Threading.Channel
the reader callsBasicNack
on that message (at this point there is also another message in the channel) but in rabbitmq after theBasicNack
is complete for this first message (also tried BasicReject) all 3 messages all still show as waiting acknowledgement.The messages do all return to the queue (ready state) if I close the channel..
However given that I'd like to return the channel to the channel pool I'd rather not have to close the channel, I'd rather keep the channel open for re-use.
So my first question is: I am wondering if there is any reason that might explain the the behaviour I think I am witnessing, in terms of BasicReject / BasicNack not working when called later on for message (out of band from the Receive event handler).
My next question is around re-use of channels amongst subscribers.
Assuming the single reader of the
System.Threading.Channel
hits a problem and can't process a message, and BasicNack`s it. It's use case is that if it can't process a message it stops there and will not process any more - as it cares about preserving order when processing. The idea is the issue with the current message must be resolved before processing would continue. It might be that it eventually gets moved to a dead letter queue for example and thats ok. However at the point the reader decides to "stop" at a message that cannot be processed, there might still be:-AsyncEventingBasicConsumer
has not firedReceived
for yet.AsyncEventingBasicConsumer
has fired theReceived
event for, and were subsequently pushed to theSystem.Threading.Channel
and are waiting there behind the current message for the reader to pick them up still - but it doesn't want to continue to process them as it wants to "stop" at the current message.In this situation in order to keep the channel alive, but "free up" the messages so they are available for redelivery to the next consumer that is created on the same channel (to replace the current consumer that has experienced the issue that may be transient), I think I need to do the following:
System.Threading.Channel
.I believe I can do 1, 3, and 4 but not sure how to go about 2, or if there is a better way to handle this scenario.
Beta Was this translation helpful? Give feedback.
All reactions