diff --git a/pykafka/balancedconsumer.py b/pykafka/balancedconsumer.py index 0e5b78dfe..a6b70b98d 100644 --- a/pykafka/balancedconsumer.py +++ b/pykafka/balancedconsumer.py @@ -102,7 +102,8 @@ def __init__(self, use_rdkafka=False, compacted_topic=False, membership_protocol=RangeProtocol, - deserializer=None): + deserializer=None, + reset_offset_on_fetch=True): """Create a BalancedConsumer instance :param topic: The topic this consumer should consume @@ -213,6 +214,9 @@ def __init__(self, fields transformed according to the client code's serialization logic. See `pykafka.utils.__init__` for stock implemtations. :type deserializer: function + :param reset_offset_on_fetch: Whether to update offsets during fetch_offsets. + Disable for read-only use cases to prevent side-effects. + :type reset_offset_on_fetch: bool """ self._cluster = cluster try: @@ -248,6 +252,7 @@ def __init__(self, self._is_compacted_topic = compacted_topic self._membership_protocol = membership_protocol self._deserializer = deserializer + self._reset_offset_on_fetch = reset_offset_on_fetch if not rdkafka and use_rdkafka: raise ImportError("use_rdkafka requires rdkafka to be installed") @@ -445,7 +450,8 @@ def _get_internal_consumer(self, partitions=None, start=True): reset_offset_on_start=reset_offset_on_start, auto_start=False, compacted_topic=self._is_compacted_topic, - deserializer=self._deserializer + deserializer=self._deserializer, + reset_offset_on_fetch=self._reset_offset_on_fetch ) cns.consumer_id = self._consumer_id cns.generation_id = self._generation_id diff --git a/pykafka/managedbalancedconsumer.py b/pykafka/managedbalancedconsumer.py index 9f90e593a..eb78e68eb 100644 --- a/pykafka/managedbalancedconsumer.py +++ b/pykafka/managedbalancedconsumer.py @@ -74,7 +74,8 @@ def __init__(self, compacted_topic=True, heartbeat_interval_ms=3000, membership_protocol=RangeProtocol, - deserializer=None): + deserializer=None, + reset_offset_on_fetch=True): """Create a ManagedBalancedConsumer instance :param topic: The topic this consumer should consume @@ -177,6 +178,9 @@ def __init__(self, fields transformed according to the client code's serialization logic. See `pykafka.utils.__init__` for stock implemtations. :type deserializer: function + :param reset_offset_on_fetch: Whether to update offsets during fetch_offsets. + Disable for read-only use cases to prevent side-effects. + :type reset_offset_on_fetch: bool """ self._cluster = cluster @@ -209,6 +213,7 @@ def __init__(self, self._membership_protocol.metadata.topic_names = [self._topic.name] self._heartbeat_interval_ms = valid_int(heartbeat_interval_ms) self._deserializer = deserializer + self._reset_offset_on_fetch = reset_offset_on_fetch if use_rdkafka is True: raise ImportError("use_rdkafka is not available for {}".format( self.__class__.__name__)) diff --git a/pykafka/simpleconsumer.py b/pykafka/simpleconsumer.py index 07484e44a..8b4670355 100644 --- a/pykafka/simpleconsumer.py +++ b/pykafka/simpleconsumer.py @@ -202,6 +202,7 @@ def __init__(self, self._generation_id = -1 self._consumer_id = b'' self._deserializer = deserializer + self._reset_offset_on_fetch = reset_offset_on_fetch # incremented for any message arrival from any partition # the initial value is 0 (no messages waiting) @@ -236,7 +237,6 @@ def __init__(self, self.partition_cycle = itertools.cycle(self._partitions.values()) self._default_error_handlers = self._build_default_error_handlers() - self._reset_offset_on_fetch = reset_offset_on_fetch if self._auto_start: self.start()