From 8cd908178d39c202bed3f689663acacf018701c9 Mon Sep 17 00:00:00 2001 From: forum-hs <156860265+forum-hs@users.noreply.github.com> Date: Mon, 12 Aug 2024 12:04:27 -0700 Subject: [PATCH] fix: dia-1330: Adding log messages to capture message into and out of kafka topic. (#177) Co-authored-by: Forum Gala --- adala/environments/kafka.py | 7 ++++++- server/app.py | 3 +++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/adala/environments/kafka.py b/adala/environments/kafka.py index b84064d6..ea1dd023 100644 --- a/adala/environments/kafka.py +++ b/adala/environments/kafka.py @@ -91,10 +91,15 @@ async def save(self): async def message_sender( self, producer: AIOKafkaProducer, data: Iterable, topic: str ): + record_no = 0 try: for record in data: await producer.send_and_wait(topic, value=record) + record_no += 1 # print_text(f"Sent message: {record} to {topic=}") + logger.info( + f"The number of records sent to topic:{topic}, record_no:{record_no}" + ) finally: pass # print_text(f"No more messages for {topic=}") @@ -116,7 +121,7 @@ async def get_data_batch(self, batch_size: Optional[int]) -> InternalDataFrame: batch_data = [msg.value for msg in messages] logger.info( - f"Received a batch of {len(batch_data)} records from Kafka topic {self.kafka_input_topic}" + f"Received a batch with number_of_messages:{len(batch_data)} records from Kafka input_topic:{self.kafka_input_topic}" ) return InternalDataFrame(batch_data) diff --git a/server/app.py b/server/app.py index 9bc5858b..57617ba7 100644 --- a/server/app.py +++ b/server/app.py @@ -179,6 +179,9 @@ async def submit_batch(batch: BatchData): # FIXME Temporary workaround for messages getting dropped. # Remove once our kafka messaging is more reliable. time.sleep(0.1) + logger.info( + f"The number of records sent to input_topic:{topic} record_no:{len(batch.data)}" + ) except UnknownTopicOrPartitionError: await producer.stop() raise HTTPException(