diff --git a/adala/environments/kafka.py b/adala/environments/kafka.py index 429e4d3..67b2084 100644 --- a/adala/environments/kafka.py +++ b/adala/environments/kafka.py @@ -55,9 +55,9 @@ async def initialize(self): self.kafka_input_topic, bootstrap_servers=self.kafka_bootstrap_servers, value_deserializer=lambda v: json.loads(v.decode("utf-8")), - enable_auto_commit=False, # True by default which causes messages to be missed when using getmany() + #enable_auto_commit=False, # True by default which causes messages to be missed when using getmany() auto_offset_reset="earliest", - group_id=self.kafka_input_topic, # ensuring unique group_id to not mix up offsets between topics + #group_id=self.kafka_input_topic, # ensuring unique group_id to not mix up offsets between topics ) await self.consumer.start() @@ -107,7 +107,7 @@ async def get_data_batch(self, batch_size: Optional[int]) -> InternalDataFrame: batch = await self.consumer.getmany( timeout_ms=self.timeout_ms, max_records=batch_size ) - await self.consumer.commit() + #await self.consumer.commit() if len(batch) == 0: batch_data = [] diff --git a/server/tasks/stream_inference.py b/server/tasks/stream_inference.py index 79205c4..88a6b2c 100644 --- a/server/tasks/stream_inference.py +++ b/server/tasks/stream_inference.py @@ -140,9 +140,9 @@ async def async_process_streaming_output( output_topic_name, bootstrap_servers=settings.kafka_bootstrap_servers, value_deserializer=lambda v: json.loads(v.decode("utf-8")), - enable_auto_commit=False, # True by default which causes messages to be missed when using getmany() + #enable_auto_commit=False, # True by default which causes messages to be missed when using getmany() auto_offset_reset="earliest", - group_id=output_topic_name, # ensuring unique group_id to not mix up offsets between topics + #group_id=output_topic_name, # ensuring unique group_id to not mix up offsets between topics ) await consumer.start() logger.info(f"consumer started {output_topic_name=}") @@ -158,7 +158,7 @@ async def async_process_streaming_output( try: while not input_done.is_set(): data = await consumer.getmany(timeout_ms=timeout_ms, max_records=batch_size) - await consumer.commit() + #await consumer.commit() for topic_partition, messages in data.items(): topic = topic_partition.topic # messages is a list of ConsumerRecord