Skip to content

Commit

Permalink
Avoid adding a closed client to the pool (#10337)
Browse files Browse the repository at this point in the history
  • Loading branch information
flyrain authored May 15, 2024
1 parent 4c9f47d commit 2cd6d0d
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 27 deletions.
46 changes: 22 additions & 24 deletions core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.Deque;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -70,37 +71,29 @@ public <R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedExc
try {
return action.run(client);
} catch (Exception exc) {
if (!retry || !isConnectionException(exc)) {
throw exc;
if (retry && isConnectionException(exc)) {
int retryAttempts = 0;
while (retryAttempts < maxRetries) {
try {
client = reconnect(client);
return action.run(client);
} catch (Exception e) {
if (isConnectionException(e)) {
retryAttempts++;
Thread.sleep(connectionRetryWaitPeriodMs);
} else {
throw reconnectExc.cast(exc);
}
}
}
}

return retryAction(action, exc, client);

throw exc;
} finally {
release(client);
}
}

private <R> R retryAction(Action<R, C, E> action, Exception originalFailure, C client)
throws E, InterruptedException {
int retryAttempts = 0;
while (retryAttempts < maxRetries) {
try {
C reconnectedClient = reconnect(client);
return action.run(reconnectedClient);
} catch (Exception exc) {
if (isConnectionException(exc)) {
retryAttempts++;
Thread.sleep(connectionRetryWaitPeriodMs);
} else {
throw reconnectExc.cast(originalFailure);
}
}
}

throw reconnectExc.cast(originalFailure);
}

protected abstract C newClient();

protected abstract C reconnect(C client);
Expand Down Expand Up @@ -169,6 +162,11 @@ private void release(C client) {
}
}

@VisibleForTesting
Deque<C> clients() {
return clients;
}

public int poolSize() {
return poolSize;
}
Expand Down
15 changes: 12 additions & 3 deletions core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@ public void testRetrySucceedsWithinMaxAttempts() throws Exception {
int succeedAfterAttempts = 3;
try (MockClientPoolImpl mockClientPool =
new MockClientPoolImpl(2, RetryableException.class, true, maxRetries)) {
// initial the client pool with a client, so that we can verify the client is replaced
MockClient firstClient = mockClientPool.newClient();
mockClientPool.clients().add(firstClient);

int actions = mockClientPool.run(client -> client.succeedAfter(succeedAfterAttempts));
assertThat(actions)
.as("There should be exactly one successful action invocation")
.isEqualTo(1);
assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(succeedAfterAttempts - 1);
assertThat(mockClientPool.clients().peekFirst().equals(firstClient)).isFalse();
}
}

Expand Down Expand Up @@ -78,11 +83,15 @@ static class NonRetryableException extends RuntimeException {}

static class MockClient {
boolean closed = false;

int actions = 0;

int retryableFailures = 0;

MockClient() {}

MockClient(int retryableFailures) {
this.retryableFailures = retryableFailures;
}

public void close() {
closed = true;
}
Expand Down Expand Up @@ -126,7 +135,7 @@ protected MockClient newClient() {
@Override
protected MockClient reconnect(MockClient client) {
reconnectionAttempts++;
return client;
return new MockClient(reconnectionAttempts);
}

@Override
Expand Down

0 comments on commit 2cd6d0d

Please sign in to comment.