From 300a8d521e0612d070bb5d3b77975418baa4737b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 9 Aug 2023 21:06:03 +0800 Subject: [PATCH] Fix topic lookup failed exceptionally with ServiceUnitNotReady ### Motivation `testMultiBrokerUnloadReload` is flaky with the Pulsar dependency upgraded to 3.1.0-SNAPSHOT. Here are the related logs: ``` 02:37:33.275 [pulsar-ph-kafka-644-3:io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService@105] ERROR io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService - [io.streamnative.pulsar.handlers.kop.storage.PartitionLog@131e450c] Failed t o getTopic persistent://public/default/kopMultiBrokerUnloadReload10-partition-3. exception: java.util.concurrent.CompletionException: org.apache.pulsar.broker.service.BrokerServiceException$ServiceUnitNotReadyException: ``` It's because the exception passed from `BrokerService#getTopic` is `CompletionException`, not `ServiceUnitNotReadyException`, while KoP does not get the cause of a `CompletionException` in `handleGetTopicException`. And then a `UNKNOWN_SERVER_ERROR` is returned to the client. ``` 02:37:33.277 [pulsar-ph-kafka-644-3:io.streamnative.pulsar.handlers.kop.storage.ReplicaManager$PendingProduceCallback@121] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords. {kopMultiBrokerUnloadReload10-3={error: UNKNOWN_SERVER_ERROR,offset: -1,logAppendTime: -1, logStartOffset: -1, recordErrors: [], errorMessage: null}} ``` ### Modifications Unwrap the `CompletionException` in `handleGetTopicException`. --- .../pulsar/handlers/kop/KafkaTopicLookupService.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicLookupService.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicLookupService.java index a4e08ea168..04d17ae992 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicLookupService.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicLookupService.java @@ -15,6 +15,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.service.BrokerService; @@ -96,8 +97,9 @@ private void handleGetTopicException(@NonNull final String topicName, final CompletableFuture> topicCompletableFuture, @NonNull final Throwable ex, @NonNull final Object requestor) { + final Throwable realThrowable = (ex instanceof CompletionException) ? ex.getCause() : ex; // The ServiceUnitNotReadyException is retryable, so we should print a warning log instead of error log - if (ex instanceof BrokerServiceException.ServiceUnitNotReadyException) { + if (realThrowable instanceof BrokerServiceException.ServiceUnitNotReadyException) { log.warn("[{}] Failed to getTopic {}: {}", requestor, topicName, ex.getMessage()); topicCompletableFuture.complete(Optional.empty());