From abb69fa2cb5d6f41ee822fa3bea9e6f0582daa06 Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Wed, 25 Oct 2023 14:31:00 -0500 Subject: [PATCH] Single instance per topic rather than per worker Signed-off-by: Chase Engelbrecht --- .../plugins/kafka/consumer/KafkaCustomConsumerFactory.java | 3 ++- .../dataprepper/plugins/kafka/source/KafkaSource.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java index 0251419c6b..83bca0b4a9 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java @@ -80,6 +80,7 @@ public List createConsumersForTopic(final KafkaConsumerConf try { final int numWorkers = topic.getWorkers(); + final TopicEmptinessMetadata topicEmptinessMetadata = new TopicEmptinessMetadata(); IntStream.range(0, numWorkers).forEach(index -> { KafkaDataConfig dataConfig = new KafkaDataConfigAdapter(keyFactory, topic); Deserializer keyDeserializer = (Deserializer) serializationFactory.getDeserializer(PlaintextKafkaDataConfig.plaintextDataConfig(dataConfig)); @@ -93,7 +94,7 @@ public List createConsumersForTopic(final KafkaConsumerConf final KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties, keyDeserializer, valueDeserializer); consumers.add(new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, kafkaConsumerConfig, topic, - schemaType, acknowledgementSetManager, byteDecoder, topicMetrics, new TopicEmptinessMetadata())); + schemaType, acknowledgementSetManager, byteDecoder, topicMetrics, topicEmptinessMetadata)); }); } catch (Exception e) { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 0b49400dcd..f942b32152 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -117,6 +117,7 @@ public void start(Buffer> buffer) { int numWorkers = topic.getWorkers(); executorService = Executors.newFixedThreadPool(numWorkers); allTopicExecutorServices.add(executorService); + final TopicEmptinessMetadata topicEmptinessMetadata = new TopicEmptinessMetadata(); IntStream.range(0, numWorkers).forEach(index -> { while (true) { @@ -140,7 +141,7 @@ public void start(Buffer> buffer) { } consumer = new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, - acknowledgementSetManager, null, topicMetrics, new TopicEmptinessMetadata()); + acknowledgementSetManager, null, topicMetrics, topicEmptinessMetadata); allTopicConsumers.add(consumer); executorService.submit(consumer);