From 045f7e2e975ae0d934e9ee15200ff6e1edc694e8 Mon Sep 17 00:00:00 2001 From: Marek Schmidt Date: Mon, 9 Sep 2024 10:49:04 +0200 Subject: [PATCH] LoomKafkaProducer|Consumer let the background thread finish itself (#4013) --- .../dispatcherloom/LoomKafkaConsumer.java | 20 +++++-------------- .../receiverloom/LoomKafkaProducer.java | 9 ++++++--- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java b/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java index 164913d822..f34af4371a 100644 --- a/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java +++ b/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -44,7 +45,6 @@ public class LoomKafkaConsumer implements ReactiveKafkaConsumer { private final Consumer consumer; private final BlockingQueue taskQueue; private final AtomicBoolean isClosed; - private final AtomicBoolean isFinished; private final Thread taskRunnerThread; private final Promise closePromise = Promise.promise(); @@ -52,7 +52,6 @@ public LoomKafkaConsumer(Vertx vertx, Consumer consumer) { this.consumer = consumer; this.taskQueue = new LinkedBlockingQueue<>(); this.isClosed = new AtomicBoolean(false); - this.isFinished = new AtomicBoolean(false); if (Boolean.parseBoolean(System.getenv("ENABLE_VIRTUAL_THREADS"))) { this.taskRunnerThread = Thread.ofVirtual().start(this::processTaskQueue); @@ -74,14 +73,15 @@ private void processTaskQueue() { // Process queue elements until this is closed and the tasks queue is empty while (!isClosed.get() || !taskQueue.isEmpty()) { try { - taskQueue.take().run(); + Runnable task = taskQueue.poll(2000, TimeUnit.MILLISECONDS); + if (task != null) { + task.run(); + } } catch (InterruptedException e) { logger.debug("Interrupted while waiting for task", e); break; } } - - isFinished.set(true); } @Override @@ -126,16 +126,6 @@ public Future close() { } logger.debug("Queue is empty"); - if (!isFinished.get()) { - logger.debug("Background thread not finished yet, waiting for it to complete"); - Thread.sleep(2000L); - - if (!isFinished.get()) { - logger.debug("Background thread still not finished yet, interrupting background thread"); - taskRunnerThread.interrupt(); - } - } - taskRunnerThread.join(); closePromise.tryComplete(); diff --git a/data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/LoomKafkaProducer.java b/data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/LoomKafkaProducer.java index 402760854e..a1ba796715 100644 --- a/data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/LoomKafkaProducer.java +++ b/data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/LoomKafkaProducer.java @@ -26,6 +26,7 @@ import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -84,7 +85,11 @@ private void sendFromQueue() { // Process queue elements until this is closed and the tasks queue is empty while (!isClosed.get() || !eventQueue.isEmpty()) { try { - final var recordPromise = eventQueue.take(); + final var recordPromise = eventQueue.poll(2000, TimeUnit.MILLISECONDS); + if (recordPromise == null) { + continue; + } + final var startedSpan = this.tracer == null ? null : this.tracer.prepareSendMessage(recordPromise.getContext(), recordPromise.getRecord()); @@ -140,8 +145,6 @@ public Future close() { logger.debug("Waiting for the eventQueue to become empty"); Thread.sleep(2000L); } - logger.debug("Interrupting sendFromQueueThread thread"); - sendFromQueueThread.interrupt(); logger.debug("Waiting for sendFromQueueThread thread to complete"); sendFromQueueThread.join(); logger.debug("Closing the producer");