Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Fix topic lookup failed exceptionally with ServiceUnitNotReady
Browse files Browse the repository at this point in the history
### Motivation

`testMultiBrokerUnloadReload` is flaky with the Pulsar dependency
upgraded to 3.1.0-SNAPSHOT. Here are the related logs:

```
02:37:33.275 [pulsar-ph-kafka-644-3:io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService@105] ERROR io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService - [io.streamnative.pulsar.handlers.kop.storage.PartitionLog@131e450c] Failed t
o getTopic persistent://public/default/kopMultiBrokerUnloadReload10-partition-3. exception:
java.util.concurrent.CompletionException: org.apache.pulsar.broker.service.BrokerServiceException$ServiceUnitNotReadyException:
```

It's because the exception passed from `BrokerService#getTopic` is
`CompletionException`, not `ServiceUnitNotReadyException`, while KoP
does not get the cause of a `CompletionException` in
`handleGetTopicException`. And then a `UNKNOWN_SERVER_ERROR` is returned
to the client.

```
02:37:33.277 [pulsar-ph-kafka-644-3:io.streamnative.pulsar.handlers.kop.storage.ReplicaManager$PendingProduceCallback@121] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords. {kopMultiBrokerUnloadReload10-3={error: UNKNOWN_SERVER_ERROR,offset: -1,logAppendTime: -1, logStartOffset: -1, recordErrors: [], errorMessage: null}}
```

### Modifications

Unwrap the `CompletionException` in `handleGetTopicException`.
  • Loading branch information
BewareMyPower committed Aug 9, 2023
1 parent 26980a3 commit 300a8d5
Showing 1 changed file with 3 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.BrokerService;
Expand Down Expand Up @@ -96,8 +97,9 @@ private void handleGetTopicException(@NonNull final String topicName,
final CompletableFuture<Optional<PersistentTopic>> topicCompletableFuture,
@NonNull final Throwable ex,
@NonNull final Object requestor) {
final Throwable realThrowable = (ex instanceof CompletionException) ? ex.getCause() : ex;
// The ServiceUnitNotReadyException is retryable, so we should print a warning log instead of error log
if (ex instanceof BrokerServiceException.ServiceUnitNotReadyException) {
if (realThrowable instanceof BrokerServiceException.ServiceUnitNotReadyException) {
log.warn("[{}] Failed to getTopic {}: {}",
requestor, topicName, ex.getMessage());
topicCompletableFuture.complete(Optional.empty());
Expand Down

0 comments on commit 300a8d5

Please sign in to comment.