Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry on configurable exception #6991

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ val latestReleasedVersion: String by lazy {

class AllowNewAbstractMethodOnAutovalueClasses : AbstractRecordingSeenMembers() {
override fun maybeAddViolation(member: JApiCompatibility): Violation? {
val allowableAutovalueChanges = setOf(JApiCompatibilityChangeType.METHOD_ABSTRACT_ADDED_TO_CLASS, JApiCompatibilityChangeType.METHOD_ADDED_TO_PUBLIC_CLASS)
val allowableAutovalueChanges = setOf(JApiCompatibilityChangeType.METHOD_ABSTRACT_ADDED_TO_CLASS,
JApiCompatibilityChangeType.METHOD_ADDED_TO_PUBLIC_CLASS, JApiCompatibilityChangeType.ANNOTATION_ADDED)
if (member.compatibilityChanges.filter { !allowableAutovalueChanges.contains(it.type) }.isEmpty() &&
member is JApiMethod && isAutoValueClass(member.getjApiClass()))
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
Comparing source compatibility of opentelemetry-sdk-common-1.47.0-SNAPSHOT.jar against opentelemetry-sdk-common-1.46.0.jar
No changes.
**** MODIFIED CLASS: PUBLIC ABSTRACT io.opentelemetry.sdk.common.export.RetryPolicy (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++* NEW METHOD: PUBLIC(+) ABSTRACT(+) java.util.function.Predicate<java.io.IOException> getRetryExceptionPredicate()
+++ NEW ANNOTATION: javax.annotation.Nullable
**** MODIFIED CLASS: PUBLIC ABSTRACT STATIC io.opentelemetry.sdk.common.export.RetryPolicy$RetryPolicyBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++* NEW METHOD: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.common.export.RetryPolicy$RetryPolicyBuilder setRetryExceptionPredicate(java.util.function.Predicate<java.io.IOException>)
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ void stringRepresentation() throws IOException, CertificateEncodingException {
+ ", "
+ "compressorEncoding=gzip, "
+ "headers=Headers\\{.*foo=OBFUSCATED.*\\}, "
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3\\}"
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3, retryExceptionPredicate=null\\}"
+ ".*" // Maybe additional grpcChannel field, signal specific fields
+ "\\}");
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ void stringRepresentation() throws IOException, CertificateEncodingException {
+ ", "
+ "exportAsJson=false, "
+ "headers=Headers\\{.*foo=OBFUSCATED.*\\}, "
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3\\}"
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3, retryExceptionPredicate=null\\}"
+ ".*" // Maybe additional signal specific fields
+ "\\}");
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
Expand All @@ -35,6 +36,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -68,6 +70,7 @@ public final class JdkHttpSender implements HttpSender {
private final long timeoutNanos;
private final Supplier<Map<String, List<String>>> headerSupplier;
@Nullable private final RetryPolicy retryPolicy;
private final Predicate<IOException> retryExceptionPredicate;

// Visible for testing
JdkHttpSender(
Expand All @@ -91,6 +94,10 @@ public final class JdkHttpSender implements HttpSender {
this.timeoutNanos = timeoutNanos;
this.headerSupplier = headerSupplier;
this.retryPolicy = retryPolicy;
this.retryExceptionPredicate =
Optional.ofNullable(retryPolicy)
.map(RetryPolicy::getRetryExceptionPredicate)
.orElse(JdkHttpSender::isRetryableException);
}

JdkHttpSender(
Expand Down Expand Up @@ -235,7 +242,7 @@ HttpResponse<byte[]> sendInternal(Marshaler marshaler) throws IOException {
}
}
if (exception != null) {
boolean retryable = isRetryableException(exception);
boolean retryable = retryExceptionPredicate.test(exception);
if (logger.isLoggable(Level.FINER)) {
logger.log(
Level.FINER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;
import okhttp3.Interceptor;
Expand All @@ -33,7 +34,7 @@ public final class RetryInterceptor implements Interceptor {

private final RetryPolicy retryPolicy;
private final Function<Response, Boolean> isRetryable;
private final Function<IOException, Boolean> isRetryableException;
private final Predicate<IOException> retryExceptionPredicate;
private final Sleeper sleeper;
private final BoundedLongGenerator randomLong;

Expand All @@ -42,7 +43,9 @@ public RetryInterceptor(RetryPolicy retryPolicy, Function<Response, Boolean> isR
this(
retryPolicy,
isRetryable,
RetryInterceptor::isRetryableException,
retryPolicy.getRetryExceptionPredicate() == null
? RetryInterceptor::isRetryableException
: retryPolicy.getRetryExceptionPredicate(),
TimeUnit.NANOSECONDS::sleep,
bound -> ThreadLocalRandom.current().nextLong(bound));
}
Expand All @@ -51,12 +54,12 @@ public RetryInterceptor(RetryPolicy retryPolicy, Function<Response, Boolean> isR
RetryInterceptor(
RetryPolicy retryPolicy,
Function<Response, Boolean> isRetryable,
Function<IOException, Boolean> isRetryableException,
Predicate<IOException> retryExceptionPredicate,
Sleeper sleeper,
BoundedLongGenerator randomLong) {
this.retryPolicy = retryPolicy;
this.isRetryable = isRetryable;
this.isRetryableException = isRetryableException;
this.retryExceptionPredicate = retryExceptionPredicate;
this.sleeper = sleeper;
this.randomLong = randomLong;
}
Expand Down Expand Up @@ -109,7 +112,7 @@ public Response intercept(Chain chain) throws IOException {
}
}
if (exception != null) {
boolean retryable = Boolean.TRUE.equals(isRetryableException.apply(exception));
boolean retryable = retryExceptionPredicate.test(exception);
if (logger.isLoggable(Level.FINER)) {
logger.log(
Level.FINER,
Expand Down Expand Up @@ -144,6 +147,11 @@ private static String responseStringRepresentation(Response response) {
return joiner.toString();
}

// Visible for testing
boolean shouldRetryOnException(IOException e) {
return retryExceptionPredicate.test(e);
}

// Visible for testing
static boolean isRetryableException(IOException e) {
if (e instanceof SocketTimeoutException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.net.ConnectException;
import java.net.HttpRetryException;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
Expand All @@ -48,34 +51,38 @@ class RetryInterceptorTest {

@Mock private RetryInterceptor.Sleeper sleeper;
@Mock private RetryInterceptor.BoundedLongGenerator random;
private Function<IOException, Boolean> isRetryableException;
private Predicate<IOException> retryExceptionPredicate;

private RetryInterceptor retrier;
private OkHttpClient client;

@BeforeEach
void setUp() {
// Note: cannot replace this with lambda or method reference because we need to spy on it
isRetryableException =
Logger logger = java.util.logging.Logger.getLogger(RetryInterceptor.class.getName());
logger.setLevel(Level.FINER);
retryExceptionPredicate =
spy(
new Function<IOException, Boolean>() {
new Predicate<IOException>() {
@Override
public Boolean apply(IOException exception) {
return RetryInterceptor.isRetryableException(exception);
public boolean test(IOException e) {
return RetryInterceptor.isRetryableException(e)
|| (e instanceof HttpRetryException
&& e.getMessage().contains("timeout retry"));
}
});

RetryPolicy retryPolicy =
RetryPolicy.builder()
.setBackoffMultiplier(1.6)
.setInitialBackoff(Duration.ofSeconds(1))
.setMaxBackoff(Duration.ofSeconds(2))
.setMaxAttempts(5)
.setRetryExceptionPredicate(retryExceptionPredicate)
.build();

retrier =
new RetryInterceptor(
RetryPolicy.builder()
.setBackoffMultiplier(1.6)
.setInitialBackoff(Duration.ofSeconds(1))
.setMaxBackoff(Duration.ofSeconds(2))
.setMaxAttempts(5)
.build(),
r -> !r.isSuccessful(),
isRetryableException,
sleeper,
random);
retryPolicy, r -> !r.isSuccessful(), retryExceptionPredicate, sleeper, random);
client = new OkHttpClient.Builder().addInterceptor(retrier).build();
}

Expand Down Expand Up @@ -154,7 +161,7 @@ void connectTimeout() throws Exception {
client.newCall(new Request.Builder().url("http://10.255.255.1").build()).execute())
.isInstanceOf(SocketTimeoutException.class);

verify(isRetryableException, times(5)).apply(any());
verify(retryExceptionPredicate, times(5)).test(any());
// Should retry maxAttempts, and sleep maxAttempts - 1 times
verify(sleeper, times(4)).sleep(anyLong());
}
Expand All @@ -174,7 +181,7 @@ void connectException() throws Exception {
.execute())
.isInstanceOfAny(ConnectException.class, SocketTimeoutException.class);

verify(isRetryableException, times(5)).apply(any());
verify(retryExceptionPredicate, times(5)).test(any());
// Should retry maxAttempts, and sleep maxAttempts - 1 times
verify(sleeper, times(4)).sleep(anyLong());
}
Expand All @@ -190,16 +197,16 @@ private static int freePort() {
@Test
void nonRetryableException() throws InterruptedException {
client = connectTimeoutClient();
// Override isRetryableException so that no exception is retryable
when(isRetryableException.apply(any())).thenReturn(false);
// Override retryPredicate so that no exception is retryable
when(retryExceptionPredicate.test(any())).thenReturn(false);

// Connecting to a non-routable IP address to trigger connection timeout
assertThatThrownBy(
() ->
client.newCall(new Request.Builder().url("http://10.255.255.1").build()).execute())
.isInstanceOf(SocketTimeoutException.class);

verify(isRetryableException, times(1)).apply(any());
verify(retryExceptionPredicate, times(1)).test(any());
verify(sleeper, never()).sleep(anyLong());
}

Expand All @@ -214,20 +221,51 @@ private OkHttpClient connectTimeoutClient() {
void isRetryableException() {
// Should retry on connection timeouts, where error message is "Connect timed out" or "connect
// timed out"
assertThat(
RetryInterceptor.isRetryableException(new SocketTimeoutException("Connect timed out")))
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException("Connect timed out")))
.isTrue();
assertThat(
RetryInterceptor.isRetryableException(new SocketTimeoutException("connect timed out")))
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException("connect timed out")))
.isTrue();
// Shouldn't retry on read timeouts, where error message is "Read timed out"
assertThat(RetryInterceptor.isRetryableException(new SocketTimeoutException("Read timed out")))
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException("Read timed out")))
.isFalse();
// Shouldn't retry on write timeouts, where error message is "timeout", or other IOException
assertThat(RetryInterceptor.isRetryableException(new SocketTimeoutException("timeout")))
// Shouldn't retry on write timeouts or other IOException
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException("timeout"))).isFalse();
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException())).isTrue();
assertThat(retrier.shouldRetryOnException(new IOException("error"))).isFalse();

// Testing configured predicate
assertThat(retrier.shouldRetryOnException(new HttpRetryException("error", 400))).isFalse();
assertThat(retrier.shouldRetryOnException(new HttpRetryException("timeout retry", 400)))
.isTrue();
}

@Test
void isRetryableExceptionDefaultBehaviour() {
RetryInterceptor retryInterceptor =
new RetryInterceptor(RetryPolicy.getDefault(), OkHttpHttpSender::isRetryable);
assertThat(
retryInterceptor.shouldRetryOnException(
new SocketTimeoutException("Connect timed out")))
.isTrue();
assertThat(retryInterceptor.shouldRetryOnException(new IOException("Connect timed out")))
.isFalse();
}

@Test
void isRetryableExceptionCustomRetryPredicate() {
RetryInterceptor retryInterceptor =
new RetryInterceptor(
RetryPolicy.builder()
.setRetryExceptionPredicate((IOException e) -> e.getMessage().equals("retry"))
.build(),
OkHttpHttpSender::isRetryable);

assertThat(retryInterceptor.shouldRetryOnException(new IOException("some message"))).isFalse();
assertThat(retryInterceptor.shouldRetryOnException(new IOException("retry"))).isTrue();
assertThat(
retryInterceptor.shouldRetryOnException(
new SocketTimeoutException("Connect timed out")))
.isFalse();
assertThat(RetryInterceptor.isRetryableException(new SocketTimeoutException())).isTrue();
assertThat(RetryInterceptor.isRetryableException(new IOException("error"))).isFalse();
}

private Response sendRequest() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import static io.opentelemetry.api.internal.Utils.checkArgument;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.time.Duration;
import java.util.function.Predicate;
import javax.annotation.Nullable;

/**
* Configuration for exporter exponential retry policy.
Expand Down Expand Up @@ -66,6 +69,13 @@ public static RetryPolicyBuilder builder() {
/** Returns the backoff multiplier. */
public abstract double getBackoffMultiplier();

/**
* Returns the predicate used to determine if thrown exception is retryableor {@code null} if no
* predicate was set.
*/
@Nullable
public abstract Predicate<IOException> getRetryExceptionPredicate();
jack-berg marked this conversation as resolved.
Show resolved Hide resolved

/** Builder for {@link RetryPolicy}. */
@AutoValue.Builder
public abstract static class RetryPolicyBuilder {
Expand Down Expand Up @@ -96,6 +106,10 @@ public abstract static class RetryPolicyBuilder {
*/
public abstract RetryPolicyBuilder setBackoffMultiplier(double backoffMultiplier);

/** Set the predicate to determine if retry should happen based on exception. */
public abstract RetryPolicyBuilder setRetryExceptionPredicate(
Predicate<IOException> retryExceptionPredicate);
Comment on lines +110 to +111
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you consider a customizer instead, to allow choice of either replacing or enhancing the default predicate?

I realize it's not the friendliest API, but we've found the pattern really useful in AutoConfigurationCustomizer

Suggested change
public abstract RetryPolicyBuilder setRetryExceptionPredicate(
Predicate<IOException> retryExceptionPredicate);
public abstract RetryPolicyBuilder setRetryExceptionPredicateCustomizer(
Fuction<Predicate<IOException>, Predicate<IOException>> retryExceptionCustomizer);

Copy link
Author

@YuriyHolinko YuriyHolinko Jan 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and later use something like this

    static class MyFunction implements Function<Predicate<IOException>, Predicate<IOException>> {
        private Predicate<IOException> newPredicate = e -> {
            return false; // logic for  my retryable condition
        };

        @Override
        public Predicate<IOException> apply(Predicate<IOException> defaultPredicate) {
            /**
             * use this statement to extend
             */
            return newPredicate.or(defaultPredicate); // extend


            /**
             * or this to override
             */
            return newPredicate;
        }
    }

it depends how many users will want to extend it. I started the PR quickly with a thought that I need to extend rertry policy, but after some time I realized if a user wants to change the default policy it's ok just to override.

but pls tell me if you think it's nice to have the enhance option for it and I will cover it in this PR and next PR.
Please note that I also have a plan to change the defaults for retryable exception.


abstract RetryPolicy autoBuild();

/** Build and return a {@link RetryPolicy} with the values of this builder. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ void build() {
assertThat(retryPolicy.getInitialBackoff()).isEqualTo(Duration.ofMillis(2));
assertThat(retryPolicy.getMaxBackoff()).isEqualTo(Duration.ofSeconds(1));
assertThat(retryPolicy.getBackoffMultiplier()).isEqualTo(1.1);
assertThat(retryPolicy.getRetryExceptionPredicate()).isEqualTo(null);
}

@Test
Expand Down
Loading