Skip to content

Commit

Permalink
[tests] Prevent test timeouts due to pulsar client hanging while shut…
Browse files Browse the repository at this point in the history
…down (streamnative#1731)

### Motivation

Sometimes tests hang during teardown because the shutdown procedure of
the broker is blocked in waiting for our PulsarClients to close.

### Modifications

Wait for some time.

I have filed this issue upstream in Pulsar in order to have a better
management of this problem
apache/pulsar#19579
  • Loading branch information
eolivelli committed Feb 28, 2023
1 parent 3d29dff commit b1f88c7
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@

import static org.apache.commons.lang3.StringUtils.isNotBlank;

import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -33,15 +32,14 @@
*/
@Slf4j
@Getter
public abstract class AbstractPulsarClient implements Closeable {
public abstract class AbstractPulsarClient {

private final PulsarClientImpl pulsarClient;

public AbstractPulsarClient(@NonNull final PulsarClientImpl pulsarClient) {
this.pulsarClient = pulsarClient;
}

@Override
public void close() {
try {
pulsarClient.close();
Expand All @@ -50,13 +48,8 @@ public void close() {
}
}

protected static PulsarClientImpl createPulsarClient(final PulsarService pulsarService) {
try {
return (PulsarClientImpl) pulsarService.getClient();
} catch (PulsarServerException e) {
log.error("Failed to create PulsarClient", e);
throw new IllegalStateException(e);
}
public CompletableFuture<?> closeAsync() {
return pulsarClient.closeAsync();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedScheduler;
Expand All @@ -60,6 +65,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.util.FutureUtil;

/**
* Kafka Protocol Handler load and run by Pulsar Service.
Expand Down Expand Up @@ -472,17 +478,34 @@ public void close() {
statsProvider.stop();
sendResponseScheduler.shutdown();

List<CompletableFuture<?>> closeHandles = new ArrayList<>();
if (offsetTopicClient != null) {
offsetTopicClient.close();
closeHandles.add(offsetTopicClient.closeAsync());
}
if (txnTopicClient != null) {
txnTopicClient.close();
closeHandles.add(txnTopicClient.closeAsync());
}
if (lookupClient != null) {
closeHandles.add(lookupClient.closeAsync());
}
if (adminManager != null) {
adminManager.shutdown();
}
if (lookupClient != null) {
lookupClient.close();

// do not block the broker forever
// see https://github.com/apache/pulsar/issues/19579
try {
FutureUtil
.waitForAll(closeHandles)
.get(Math.max(kafkaConfig.getBrokerShutdownTimeoutMs() / 10, 1000),
TimeUnit.MILLISECONDS);
} catch (ExecutionException err) {
log.warn("Error while closing some of the internal PulsarClients", err.getCause());
} catch (TimeoutException err) {
log.warn("Could not stop all the internal PulsarClients within the configured timeout");
} catch (InterruptedException err) {
Thread.currentThread().interrupt();
log.warn("Could not stop all the internal PulsarClients");
}
}

Expand Down

0 comments on commit b1f88c7

Please sign in to comment.