Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Add a callback parameter to produced messages, called upon receipt #506

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

mikepk
Copy link

@mikepk mikepk commented Mar 17, 2016

Rather than iterate over a delivery report, an optional callback per message
is used that gets called when the message is marked as delivered. This
makes it easier to attach behaviors to particular messages without having to
correlate message bodies in a delivery report.

A sample use case is some kind of cleanup code that should only be run when a message has been guaranteed delivered. For example, a data / message bridge where messages can be dequeued (or acknowledged) from a master queue only when delivery is guaranteed in Kafka to avoid data loss. In my case, a rabbitMQ message broker is being replicated into Kafka. The message in Rabbit should only be ack'd when delivery is guaranteed otherwise there could be data loss.

This could be done by creating a registry of all sent messages and their cleanup code, then iterating over the delivery report and correlating the message with the registry, but that gets complicated and messy fast. This code uses the existing record of produced messages in pykafka (the message set / message_batch) and adds an optional callback to each message. When the delivery check is done, the callback attached to the message is executed if there is one. This keeps the async nature of the delivery check but allows attaching behaviors to the delivery check.

Rather than iterate over a delivery report, an optional callback per message
is used that gets called when the message is marked as delivered. This
makes it easier to attach behaviors to particular messages without having to
correlate message bodies in a delivery report.
@emmettbutler
Copy link
Contributor

Thanks @mikepk. I like this idea, and I think you've implemented it in a reasonable way that doesn't overly complicate the interface. Might it be useful for the callback function to accept the Message instance for which it was called? I'd also like to have @rduplain and/or @yungchin weigh in on this.

Some integration tests for the callback functions would be good as well, even if it's just a single test making sure that the function is called properly.

@emmettbutler emmettbutler self-assigned this Mar 22, 2016
@johnistan
Copy link
Contributor

i like the callback interface. two thoughts.

I am personally more interested when the message fails to be delivered so i would also push for a failure_callback as well.

second, make sure you catch any exception and bind it and allow the thread to report the exception. here is a pr that does that for the rebalance callback.

https://github.com/Parsely/pykafka/pull/411/files

@mikepk
Copy link
Author

mikepk commented Mar 23, 2016

When I have some spare time I'll go ahead and look into a failure_callback, exception, and tests. For failure callback are you thinking a call signature like def produce(self, message, partition_key=None, callback=None, errback=None):?

@emmettbutler
Copy link
Contributor

I'd advocate for not adding more than a single callback. Instead of a success callback and an error callback, we could easily get by with a single callback taking a parameter that indicates whether there was an error - maybe the exception instance.

@rduplain
Copy link
Contributor

Just to weigh in: a callback is interesting, and +1 to having a single callback that gets intuitive parameters for whether the message was successful.

@emmettbutler
Copy link
Contributor

This could use another look and some tests if you're still interested in getting it into master, @mikepk. Thanks again for the contribution!

@emmettbutler emmettbutler removed their assignment Jun 23, 2016
@emmettbutler emmettbutler mentioned this pull request Oct 2, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants