Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Add a callback parameter to produced messages, called upon receipt #506

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pykafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,16 @@ def stop_owned_brokers():
for queue_reader in queue_readers:
queue_reader.join()

def produce(self, message, partition_key=None):
def produce(self, message, partition_key=None, callback=None):
"""Produce a message.

:param message: The message to produce (use None to send null)
:type message: bytes
:param partition_key: The key to use when deciding which partition to send this
message to
:type partition_key: bytes
:param callback: function to call upon delivery receipt
:type callback: callable
"""
if not (isinstance(partition_key, bytes) or partition_key is None):
raise TypeError("Producer.produce accepts a bytes object as partition_key, "
Expand All @@ -292,7 +294,8 @@ def produce(self, message, partition_key=None):
partition_id=partition_id,
# We must pass our thread-local Queue instance directly,
# as results will be written to it in a worker thread
delivery_report_q=self._delivery_reports.queue)
delivery_report_q=self._delivery_reports.queue,
callback=callback)
self._produce(msg)

if self._synchronous:
Expand Down Expand Up @@ -373,6 +376,8 @@ def mark_as_delivered(message_batch):
owned_broker.increment_messages_pending(-1 * len(message_batch))
req.delivered += len(message_batch)
for msg in message_batch:
if msg.callback:
msg.callback()
self._delivery_reports.put(msg)

try:
Expand Down
8 changes: 6 additions & 2 deletions pykafka/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ class Message(Message, Serializable):
:ivar offset: The offset of the message
:ivar partition_id: The id of the partition to which this message belongs
:ivar delivery_report_q: For use by :class:`pykafka.producer.Producer`
:ivar callback: For use by :class:`pykafka.producer.Producer`
"""
MAGIC = 0

Expand All @@ -163,7 +164,8 @@ class Message(Message, Serializable):
"partition_id",
"partition",
"produce_attempt",
"delivery_report_q"
"delivery_report_q",
"callback"
]

def __init__(self,
Expand All @@ -173,7 +175,8 @@ def __init__(self,
offset=-1,
partition_id=-1,
produce_attempt=0,
delivery_report_q=None):
delivery_report_q=None,
callback=None):
self.compression_type = compression_type
self.partition_key = partition_key
self.value = value
Expand All @@ -186,6 +189,7 @@ def __init__(self,
self.produce_attempt = produce_attempt
# delivery_report_q is used by the producer
self.delivery_report_q = delivery_report_q
self.callback = callback

def __len__(self):
size = 4 + 1 + 1 + 4 + 4
Expand Down
9 changes: 6 additions & 3 deletions tests/pykafka/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ class TestFetchAPI(unittest2.TestCase):
'partition_id': 0,
'produce_attempt': 0,
'delivery_report_q': None,
'partition': None
'partition': None,
'callback': None
}, {
'partition_key': b'test_key',
'compression_type': 0,
Expand All @@ -114,7 +115,8 @@ class TestFetchAPI(unittest2.TestCase):
'partition_id': 0,
'produce_attempt': 0,
'delivery_report_q': None,
'partition': None
'partition': None,
'callback': None
}, {
'partition_key': None,
'compression_type': 0,
Expand All @@ -123,7 +125,8 @@ class TestFetchAPI(unittest2.TestCase):
'partition_id': 0,
'produce_attempt': 0,
'delivery_report_q': None,
'partition': None
'partition': None,
'callback': None
}]

def msg_to_dict(self, msg):
Expand Down