Skip to content

Commit

Permalink
Set consumer back to poll. Add logger. Close consumer before process_…
Browse files Browse the repository at this point in the history
…vist.
  • Loading branch information
dspeck1 committed Oct 30, 2024
1 parent 6e5aefd commit 2db2b0e
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def keda_start():
fan_out_avro_deserializer = AvroDeserializer(schema_registry_client=fan_out_schema_registry_client,
from_dict=dict_to_fanned_out_visit)

fan_out_consumer = kafka.Consumer({
fan_out_consumer_conf = {
"bootstrap.servers": fan_out_kafka_cluster,
"group.id": fan_out_kafka_group_id,
"auto.offset.reset": fan_out_kafka_topic_offset,
Expand All @@ -261,7 +261,9 @@ def keda_start():
"sasl.username": fan_out_kafka_sasl_username,
"sasl.password": fan_out_kafka_sasl_password,
'enable.auto.commit': False
})
}

fan_out_consumer = kafka.Consumer(conf=fan_out_consumer_conf, logger=_log)

_log.info("starting fan out consumer")

Expand All @@ -270,7 +272,7 @@ def keda_start():

while True:
fan_out_consumer.subscribe([fan_out_kafka_topic])
fan_out_message = fan_out_consumer.consume(num_messages=1, timeout=5)
fan_out_message = fan_out_consumer.poll(timeout=5)
if fan_out_message is None:
continue
if fan_out_message.error():
Expand All @@ -286,11 +288,12 @@ def keda_start():
MessageField.VALUE))
_log.info("Unpacked message as %r.", deserialized_fan_out_visit)
fan_out_consumer.commit(message=fan_out_message, asynchronous=False)
process_visit(deserialized_fan_out_visit)
break
finally:
fan_out_consumer.close()

process_visit(deserialized_fan_out_visit)


def _graceful_shutdown(signum: int, stack_frame):
"""Signal handler for cases where the service should gracefully shut down.
Expand Down

0 comments on commit 2db2b0e

Please sign in to comment.