From 2fe89cfb9cf4c10a3c9dff61e284004eec8256e5 Mon Sep 17 00:00:00 2001 From: Papa Bakary Camara Date: Fri, 16 Sep 2022 07:08:14 +0200 Subject: [PATCH] Extra debug message in sha_extractor publisher --- ccx_messaging/publishers/sha_publisher.py | 30 +++++++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/ccx_messaging/publishers/sha_publisher.py b/ccx_messaging/publishers/sha_publisher.py index e2d9d10..5f410f1 100644 --- a/ccx_messaging/publishers/sha_publisher.py +++ b/ccx_messaging/publishers/sha_publisher.py @@ -14,8 +14,8 @@ """Module that implements a custom Kafka publisher.""" -import logging import json +import logging from insights_messaging.publishers import Publisher from kafka import KafkaProducer @@ -63,9 +63,6 @@ def publish(self, input_msg, response): a byte array using UTF-8 encoding and the result of that will be sent to the producer to produce a message in the output Kafka topic. """ - # output message in form of a dictionary - output_msg = {} - # read all required attributes from input_msg try: org_id = int(input_msg.value["identity"]["identity"]["internal"]["org_id"]) @@ -82,12 +79,13 @@ def publish(self, input_msg, response): if response is not None: try: + msg_timestamp = input_msg.value["timestamp"] output_msg = { "OrgID": org_id, "AccountNumber": account_number, "ClusterName": input_msg.value["ClusterName"], "Images": json.loads(response), - "LastChecked": input_msg.value["timestamp"], + "LastChecked": msg_timestamp, "Version": self.outdata_schema_version, "RequestId": input_msg.value.get("request_id"), } @@ -100,6 +98,28 @@ def publish(self, input_msg, response): # Convert message string into a byte array. self.producer.send(self.topic, message.encode("utf-8")) LOG.debug("Message has been sent successfully.") + LOG.debug( + "Message context: OrgId=%s, AccountNumber=%s, " + 'ClusterName="%s", NumImages: %d, LastChecked="%s, Version=%d"', + output_msg["OrgID"], + output_msg["AccountNumber"], + output_msg["ClusterName"], + len(output_msg["Images"]), + output_msg["LastChecked"], + output_msg["Version"], + ) + + LOG.info( + "Status: Success; " + "Topic: %s; " + "Partition: %s; " + "Offset: %s; " + "LastChecked: %s", + input_msg.topic, + input_msg.partition, + input_msg.offset, + msg_timestamp, + ) except UnicodeEncodeError as err: raise CCXMessagingError( f"Error encoding the response to publish: {message}"