diff --git a/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java index 4c4469544816..c3534fa22a4a 100644 --- a/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java +++ b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.util.ArrayDeque; import java.util.Deque; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,37 +71,29 @@ public R run(Action action, boolean retry) throws E, InterruptedExc try { return action.run(client); } catch (Exception exc) { - if (!retry || !isConnectionException(exc)) { - throw exc; + if (retry && isConnectionException(exc)) { + int retryAttempts = 0; + while (retryAttempts < maxRetries) { + try { + client = reconnect(client); + return action.run(client); + } catch (Exception e) { + if (isConnectionException(e)) { + retryAttempts++; + Thread.sleep(connectionRetryWaitPeriodMs); + } else { + throw reconnectExc.cast(exc); + } + } + } } - return retryAction(action, exc, client); - + throw exc; } finally { release(client); } } - private R retryAction(Action action, Exception originalFailure, C client) - throws E, InterruptedException { - int retryAttempts = 0; - while (retryAttempts < maxRetries) { - try { - C reconnectedClient = reconnect(client); - return action.run(reconnectedClient); - } catch (Exception exc) { - if (isConnectionException(exc)) { - retryAttempts++; - Thread.sleep(connectionRetryWaitPeriodMs); - } else { - throw reconnectExc.cast(originalFailure); - } - } - } - - throw reconnectExc.cast(originalFailure); - } - protected abstract C newClient(); protected abstract C reconnect(C client); @@ -169,6 +162,11 @@ private void release(C client) { } } + @VisibleForTesting + Deque clients() { + return clients; + } + public int poolSize() { return poolSize; } diff --git a/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java b/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java index 8d62afa17678..3a6666bac38b 100644 --- a/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java +++ b/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java @@ -31,11 +31,16 @@ public void testRetrySucceedsWithinMaxAttempts() throws Exception { int succeedAfterAttempts = 3; try (MockClientPoolImpl mockClientPool = new MockClientPoolImpl(2, RetryableException.class, true, maxRetries)) { + // initial the client pool with a client, so that we can verify the client is replaced + MockClient firstClient = mockClientPool.newClient(); + mockClientPool.clients().add(firstClient); + int actions = mockClientPool.run(client -> client.succeedAfter(succeedAfterAttempts)); assertThat(actions) .as("There should be exactly one successful action invocation") .isEqualTo(1); assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(succeedAfterAttempts - 1); + assertThat(mockClientPool.clients().peekFirst().equals(firstClient)).isFalse(); } } @@ -78,11 +83,15 @@ static class NonRetryableException extends RuntimeException {} static class MockClient { boolean closed = false; - int actions = 0; - int retryableFailures = 0; + MockClient() {} + + MockClient(int retryableFailures) { + this.retryableFailures = retryableFailures; + } + public void close() { closed = true; } @@ -126,7 +135,7 @@ protected MockClient newClient() { @Override protected MockClient reconnect(MockClient client) { reconnectionAttempts++; - return client; + return new MockClient(reconnectionAttempts); } @Override