Skip to content

Commit

Permalink
add comments, use manual commit for output job
Browse files Browse the repository at this point in the history
  • Loading branch information
hakan458 committed Oct 30, 2024
1 parent 814fc5c commit 3172ad4
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 5 deletions.
6 changes: 3 additions & 3 deletions adala/environments/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ async def initialize(self):
self.kafka_input_topic,
bootstrap_servers=self.kafka_bootstrap_servers,
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
enable_auto_commit=False,
enable_auto_commit=False, # True by default which causes messages to be missed when using getmany()
auto_offset_reset="earliest",
group_id=self.kafka_input_topic,
group_id=self.kafka_input_topic, # ensuring unique group_id to not mix up offsets between topics
)
await self.consumer.start()

self.producer = AIOKafkaProducer(
bootstrap_servers=self.kafka_bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
acks='all'
acks='all' # waits for all replicas to respond that they have written the message
)
await self.producer.start()

Expand Down
2 changes: 1 addition & 1 deletion server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ async def submit_batch(batch: BatchData):
producer = AIOKafkaProducer(
bootstrap_servers=settings.kafka_bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
acks='all'
acks='all' # waits for all replicas to respond that they have written the message
)
await producer.start()

Expand Down
4 changes: 3 additions & 1 deletion server/tasks/stream_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ async def async_process_streaming_output(
output_topic_name,
bootstrap_servers=settings.kafka_bootstrap_servers,
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
enable_auto_commit=False, # True by default which causes messages to be missed when using getmany()
auto_offset_reset="earliest",
group_id=output_topic_name,
group_id=output_topic_name, # ensuring unique group_id to not mix up offsets between topics
)
await consumer.start()
logger.info(f"consumer started {output_topic_name=}")
Expand All @@ -157,6 +158,7 @@ async def async_process_streaming_output(
try:
while not input_done.is_set():
data = await consumer.getmany(timeout_ms=timeout_ms, max_records=batch_size)
await self.consumer.commit()
for topic_partition, messages in data.items():
topic = topic_partition.topic
if messages:
Expand Down

0 comments on commit 3172ad4

Please sign in to comment.