Skip to content

Commit

Permalink
dont set group id, use auto commit
Browse files Browse the repository at this point in the history
  • Loading branch information
hakan458 committed Nov 6, 2024
1 parent d524165 commit e52fc5c
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
6 changes: 3 additions & 3 deletions adala/environments/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ async def initialize(self):
self.kafka_input_topic,
bootstrap_servers=self.kafka_bootstrap_servers,
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
enable_auto_commit=False, # True by default which causes messages to be missed when using getmany()
#enable_auto_commit=False, # True by default which causes messages to be missed when using getmany()
auto_offset_reset="earliest",
group_id=self.kafka_input_topic, # ensuring unique group_id to not mix up offsets between topics
#group_id=self.kafka_input_topic, # ensuring unique group_id to not mix up offsets between topics
)
await self.consumer.start()

Expand Down Expand Up @@ -107,7 +107,7 @@ async def get_data_batch(self, batch_size: Optional[int]) -> InternalDataFrame:
batch = await self.consumer.getmany(
timeout_ms=self.timeout_ms, max_records=batch_size
)
await self.consumer.commit()
#await self.consumer.commit()

if len(batch) == 0:
batch_data = []
Expand Down
6 changes: 3 additions & 3 deletions server/tasks/stream_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ async def async_process_streaming_output(
output_topic_name,
bootstrap_servers=settings.kafka_bootstrap_servers,
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
enable_auto_commit=False, # True by default which causes messages to be missed when using getmany()
#enable_auto_commit=False, # True by default which causes messages to be missed when using getmany()
auto_offset_reset="earliest",
group_id=output_topic_name, # ensuring unique group_id to not mix up offsets between topics
#group_id=output_topic_name, # ensuring unique group_id to not mix up offsets between topics
)
await consumer.start()
logger.info(f"consumer started {output_topic_name=}")
Expand All @@ -158,7 +158,7 @@ async def async_process_streaming_output(
try:
while not input_done.is_set():
data = await consumer.getmany(timeout_ms=timeout_ms, max_records=batch_size)
await consumer.commit()
#await consumer.commit()
for topic_partition, messages in data.items():
topic = topic_partition.topic
# messages is a list of ConsumerRecord
Expand Down

0 comments on commit e52fc5c

Please sign in to comment.