Skip to content

Commit

Permalink
fix: dia-1330: Adding log messages to capture message into and out of…
Browse files Browse the repository at this point in the history
… kafka topic. (#177)

Co-authored-by: Forum Gala <[email protected]>
  • Loading branch information
forum-hs and Forum Gala authored Aug 12, 2024
1 parent 33d72e5 commit 8cd9081
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
7 changes: 6 additions & 1 deletion adala/environments/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,15 @@ async def save(self):
async def message_sender(
self, producer: AIOKafkaProducer, data: Iterable, topic: str
):
record_no = 0
try:
for record in data:
await producer.send_and_wait(topic, value=record)
record_no += 1
# print_text(f"Sent message: {record} to {topic=}")
logger.info(
f"The number of records sent to topic:{topic}, record_no:{record_no}"
)
finally:
pass
# print_text(f"No more messages for {topic=}")
Expand All @@ -116,7 +121,7 @@ async def get_data_batch(self, batch_size: Optional[int]) -> InternalDataFrame:
batch_data = [msg.value for msg in messages]

logger.info(
f"Received a batch of {len(batch_data)} records from Kafka topic {self.kafka_input_topic}"
f"Received a batch with number_of_messages:{len(batch_data)} records from Kafka input_topic:{self.kafka_input_topic}"
)
return InternalDataFrame(batch_data)

Expand Down
3 changes: 3 additions & 0 deletions server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ async def submit_batch(batch: BatchData):
# FIXME Temporary workaround for messages getting dropped.
# Remove once our kafka messaging is more reliable.
time.sleep(0.1)
logger.info(
f"The number of records sent to input_topic:{topic} record_no:{len(batch.data)}"
)
except UnknownTopicOrPartitionError:
await producer.stop()
raise HTTPException(
Expand Down

0 comments on commit 8cd9081

Please sign in to comment.