diff --git a/galasa-extensions-parent/dev.galasa.events.kafka/bnd.bnd b/galasa-extensions-parent/dev.galasa.events.kafka/bnd.bnd index d9130100..6096fe7b 100644 --- a/galasa-extensions-parent/dev.galasa.events.kafka/bnd.bnd +++ b/galasa-extensions-parent/dev.galasa.events.kafka/bnd.bnd @@ -5,6 +5,7 @@ Bundle-License: https://www.eclipse.org/legal/epl-2.0 Export-Package: dev.galasa.events.kafka* Import-Package: \ dev.galasa.framework.spi,\ + org.apache.commons.logging,\ org.apache.kafka.clients,\ org.apache.kafka.clients.admin,\ org.apache.kafka.clients.consumer,\ diff --git a/galasa-extensions-parent/dev.galasa.events.kafka/src/main/java/dev/galasa/events/kafka/internal/KafkaEventsService.java b/galasa-extensions-parent/dev.galasa.events.kafka/src/main/java/dev/galasa/events/kafka/internal/KafkaEventsService.java index 35b63c68..e022105b 100644 --- a/galasa-extensions-parent/dev.galasa.events.kafka/src/main/java/dev/galasa/events/kafka/internal/KafkaEventsService.java +++ b/galasa-extensions-parent/dev.galasa.events.kafka/src/main/java/dev/galasa/events/kafka/internal/KafkaEventsService.java @@ -15,8 +15,13 @@ import java.util.HashMap; import java.util.Properties; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + public class KafkaEventsService implements IEventsService { + private final Log logger = LogFactory.getLog(getClass()); + private IConfigurationPropertyStoreService cps; private IEventProducerFactory producerFactory; @@ -47,11 +52,14 @@ public void produceEvent(String topic, IEvent event) throws EventsException { producer = producers.get(topic); if (producer == null) { + logger.info("Creating a new producer as one does not exist for the topic " + topic); Properties properties = this.producerFactory.createProducerConfig(cps, topic); producer = this.producerFactory.createProducer(properties, topic); producers.put(topic, producer); + } else { + logger.info("Using the cached producer for the topic " + topic); } } @@ -62,7 +70,7 @@ public void produceEvent(String topic, IEvent event) throws EventsException { @Override public void shutdown() { - // Shut down all cached EventProducers + logger.info("Shutting down all cached producers"); for (Map.Entry entry : producers.entrySet()) { entry.getValue().close(); }