Skip to content

Commit

Permalink
change approach for overriding rtry predicates for exporters
Browse files Browse the repository at this point in the history
  • Loading branch information
YuriyHolinko committed Jan 18, 2025
1 parent a367900 commit 596a8bd
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 71 deletions.
3 changes: 2 additions & 1 deletion buildSrc/src/main/kotlin/otel.japicmp-conventions.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ val latestReleasedVersion: String by lazy {

class AllowNewAbstractMethodOnAutovalueClasses : AbstractRecordingSeenMembers() {
override fun maybeAddViolation(member: JApiCompatibility): Violation? {
val allowableAutovalueChanges = setOf(JApiCompatibilityChangeType.METHOD_ABSTRACT_ADDED_TO_CLASS, JApiCompatibilityChangeType.METHOD_ADDED_TO_PUBLIC_CLASS)
val allowableAutovalueChanges = setOf(JApiCompatibilityChangeType.METHOD_ABSTRACT_ADDED_TO_CLASS,
JApiCompatibilityChangeType.METHOD_ADDED_TO_PUBLIC_CLASS, JApiCompatibilityChangeType.ANNOTATION_ADDED)
if (member.compatibilityChanges.filter { !allowableAutovalueChanges.contains(it.type) }.isEmpty() &&
member is JApiMethod && isAutoValueClass(member.getjApiClass()))
{
Expand Down
7 changes: 1 addition & 6 deletions docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
Comparing source compatibility of opentelemetry-sdk-common-1.47.0-SNAPSHOT.jar against opentelemetry-sdk-common-1.46.0.jar
+++ NEW CLASS: PUBLIC(+) io.opentelemetry.sdk.common.export.DefaultRetryExceptionPredicate (not serializable)
+++ CLASS FILE FORMAT VERSION: 52.0 <- n.a.
+++ NEW INTERFACE: java.util.function.Predicate
+++ NEW SUPERCLASS: java.lang.Object
+++ NEW CONSTRUCTOR: PUBLIC(+) DefaultRetryExceptionPredicate()
+++ NEW METHOD: PUBLIC(+) boolean test(java.io.IOException)
**** MODIFIED CLASS: PUBLIC ABSTRACT io.opentelemetry.sdk.common.export.RetryPolicy (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++* NEW METHOD: PUBLIC(+) ABSTRACT(+) java.util.function.Predicate<java.io.IOException> getRetryExceptionPredicate()
+++ NEW ANNOTATION: javax.annotation.Nullable
**** MODIFIED CLASS: PUBLIC ABSTRACT STATIC io.opentelemetry.sdk.common.export.RetryPolicy$RetryPolicyBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++* NEW METHOD: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.common.export.RetryPolicy$RetryPolicyBuilder setRetryExceptionPredicate(java.util.function.Predicate<java.io.IOException>)
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ void stringRepresentation() throws IOException, CertificateEncodingException {
+ ", "
+ "compressorEncoding=gzip, "
+ "headers=Headers\\{.*foo=OBFUSCATED.*\\}, "
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3, retryExceptionPredicate=io.opentelemetry.sdk.common.export.DefaultRetryExceptionPredicate.*\\}"
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3, retryExceptionPredicate=null.*\\}"
+ ".*" // Maybe additional grpcChannel field, signal specific fields
+ "\\}");
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ void stringRepresentation() throws IOException, CertificateEncodingException {
+ ", "
+ "exportAsJson=false, "
+ "headers=Headers\\{.*foo=OBFUSCATED.*\\}, "
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3, retryExceptionPredicate=io.opentelemetry.sdk.common.export.DefaultRetryExceptionPredicate.*\\}"
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3, retryExceptionPredicate=null.*\\}"
+ ".*" // Maybe additional signal specific fields
+ "\\}");
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
Expand All @@ -35,6 +36,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -68,6 +70,7 @@ public final class JdkHttpSender implements HttpSender {
private final long timeoutNanos;
private final Supplier<Map<String, List<String>>> headerSupplier;
@Nullable private final RetryPolicy retryPolicy;
private final Predicate<IOException> retryExceptionPredicate;

// Visible for testing
JdkHttpSender(
Expand All @@ -91,6 +94,10 @@ public final class JdkHttpSender implements HttpSender {
this.timeoutNanos = timeoutNanos;
this.headerSupplier = headerSupplier;
this.retryPolicy = retryPolicy;
this.retryExceptionPredicate =
Optional.ofNullable(retryPolicy)
.map(RetryPolicy::getRetryExceptionPredicate)
.orElse(JdkHttpSender::isRetryableException);
}

JdkHttpSender(
Expand Down Expand Up @@ -235,7 +242,7 @@ HttpResponse<byte[]> sendInternal(Marshaler marshaler) throws IOException {
}
}
if (exception != null) {
boolean retryable = isRetryableException(exception);
boolean retryable = retryExceptionPredicate.test(exception);
if (logger.isLoggable(Level.FINER)) {
logger.log(
Level.FINER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;
import okhttp3.Interceptor;
Expand All @@ -33,7 +34,7 @@ public final class RetryInterceptor implements Interceptor {

private final RetryPolicy retryPolicy;
private final Function<Response, Boolean> isRetryable;
private final Function<IOException, Boolean> isRetryableException;
private final Predicate<IOException> retryExceptionPredicate;
private final Sleeper sleeper;
private final BoundedLongGenerator randomLong;

Expand All @@ -42,7 +43,9 @@ public RetryInterceptor(RetryPolicy retryPolicy, Function<Response, Boolean> isR
this(
retryPolicy,
isRetryable,
e -> retryPolicy.getRetryExceptionPredicate().test(e),
retryPolicy.getRetryExceptionPredicate() == null
? RetryInterceptor::isRetryableException
: retryPolicy.getRetryExceptionPredicate(),
TimeUnit.NANOSECONDS::sleep,
bound -> ThreadLocalRandom.current().nextLong(bound));
}
Expand All @@ -51,12 +54,12 @@ public RetryInterceptor(RetryPolicy retryPolicy, Function<Response, Boolean> isR
RetryInterceptor(
RetryPolicy retryPolicy,
Function<Response, Boolean> isRetryable,
Function<IOException, Boolean> isRetryableException,
Predicate<IOException> retryExceptionPredicate,
Sleeper sleeper,
BoundedLongGenerator randomLong) {
this.retryPolicy = retryPolicy;
this.isRetryable = isRetryable;
this.isRetryableException = isRetryableException;
this.retryExceptionPredicate = retryExceptionPredicate;
this.sleeper = sleeper;
this.randomLong = randomLong;
}
Expand Down Expand Up @@ -109,7 +112,7 @@ public Response intercept(Chain chain) throws IOException {
}
}
if (exception != null) {
boolean retryable = Boolean.TRUE.equals(isRetryableException.apply(exception));
boolean retryable = Boolean.TRUE.equals(retryExceptionPredicate.test(exception));
if (logger.isLoggable(Level.FINER)) {
logger.log(
Level.FINER,
Expand Down Expand Up @@ -146,7 +149,7 @@ private static String responseStringRepresentation(Response response) {

// Visible for testing
boolean shouldRetryOnException(IOException e) {
return isRetryableException.apply(e);
return retryExceptionPredicate.test(e);
}

// Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.testing.junit5.server.mock.MockWebServerExtension;
import io.opentelemetry.sdk.common.export.DefaultRetryExceptionPredicate;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.net.ConnectException;
Expand Down Expand Up @@ -50,21 +49,19 @@ class RetryInterceptorTest {

@Mock private RetryInterceptor.Sleeper sleeper;
@Mock private RetryInterceptor.BoundedLongGenerator random;
private Predicate<IOException> retryPredicate;
private Predicate<IOException> retryExceptionPredicate;

private RetryInterceptor retrier;
private OkHttpClient client;

@BeforeEach
void setUp() {
DefaultRetryExceptionPredicate defaultRetryExceptionPredicate =
new DefaultRetryExceptionPredicate();
retryPredicate =
retryExceptionPredicate =
spy(
new Predicate<IOException>() {
@Override
public boolean test(IOException e) {
return defaultRetryExceptionPredicate.test(e)
return RetryInterceptor.isRetryableException(e)
|| (e instanceof HttpRetryException
&& e.getMessage().contains("timeout retry"));
}
Expand All @@ -76,7 +73,7 @@ public boolean test(IOException e) {
.setInitialBackoff(Duration.ofSeconds(1))
.setMaxBackoff(Duration.ofSeconds(2))
.setMaxAttempts(5)
.setRetryExceptionPredicate(retryPredicate)
.setRetryExceptionPredicate(retryExceptionPredicate)
.build();

retrier =
Expand Down Expand Up @@ -164,7 +161,7 @@ void connectTimeout() throws Exception {
client.newCall(new Request.Builder().url("http://10.255.255.1").build()).execute())
.isInstanceOf(SocketTimeoutException.class);

verify(retryPredicate, times(5)).test(any());
verify(retryExceptionPredicate, times(5)).test(any());
// Should retry maxAttempts, and sleep maxAttempts - 1 times
verify(sleeper, times(4)).sleep(anyLong());
}
Expand All @@ -184,7 +181,7 @@ void connectException() throws Exception {
.execute())
.isInstanceOfAny(ConnectException.class, SocketTimeoutException.class);

verify(retryPredicate, times(5)).test(any());
verify(retryExceptionPredicate, times(5)).test(any());
// Should retry maxAttempts, and sleep maxAttempts - 1 times
verify(sleeper, times(4)).sleep(anyLong());
}
Expand All @@ -201,15 +198,15 @@ private static int freePort() {
void nonRetryableException() throws InterruptedException {
client = connectTimeoutClient();
// Override retryPredicate so that no exception is retryable
when(retryPredicate.test(any())).thenReturn(false);
when(retryExceptionPredicate.test(any())).thenReturn(false);

// Connecting to a non-routable IP address to trigger connection timeout
assertThatThrownBy(
() ->
client.newCall(new Request.Builder().url("http://10.255.255.1").build()).execute())
.isInstanceOf(SocketTimeoutException.class);

verify(retryPredicate, times(1)).test(any());
verify(retryExceptionPredicate, times(1)).test(any());
verify(sleeper, never()).sleep(anyLong());
}

Expand All @@ -232,7 +229,7 @@ void isRetryableException() {
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException("Read timed out")))
.isFalse();
// Shouldn't retry on write timeouts or other IOException
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException("timeout"))).isTrue();
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException("timeout"))).isFalse();
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException())).isTrue();
assertThat(retrier.shouldRetryOnException(new IOException("error"))).isFalse();

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.function.Predicate;
import javax.annotation.Nullable;

/**
* Configuration for exporter exponential retry policy.
Expand All @@ -30,9 +31,6 @@ public abstract class RetryPolicy {

private static final double DEFAULT_BACKOFF_MULTIPLIER = 1.5;

private static final Predicate<IOException> DEFAULT_RETRY_PREDICATE =
new DefaultRetryExceptionPredicate();

private static final RetryPolicy DEFAULT = RetryPolicy.builder().build();

RetryPolicy() {}
Expand All @@ -48,8 +46,7 @@ public static RetryPolicyBuilder builder() {
.setMaxAttempts(DEFAULT_MAX_ATTEMPTS)
.setInitialBackoff(Duration.ofSeconds(DEFAULT_INITIAL_BACKOFF_SECONDS))
.setMaxBackoff(Duration.ofSeconds(DEFAULT_MAX_BACKOFF_SECONDS))
.setBackoffMultiplier(DEFAULT_BACKOFF_MULTIPLIER)
.setRetryExceptionPredicate(DEFAULT_RETRY_PREDICATE);
.setBackoffMultiplier(DEFAULT_BACKOFF_MULTIPLIER);
}

/**
Expand All @@ -72,7 +69,11 @@ public static RetryPolicyBuilder builder() {
/** Returns the backoff multiplier. */
public abstract double getBackoffMultiplier();

/** Returns the predicate if exception is retryable. */
/**
* Returns the predicate used to determine if thrown exception is retryableor {@code null} if no
* predicate was set.
*/
@Nullable
public abstract Predicate<IOException> getRetryExceptionPredicate();

/** Builder for {@link RetryPolicy}. */
Expand Down Expand Up @@ -105,10 +106,7 @@ public abstract static class RetryPolicyBuilder {
*/
public abstract RetryPolicyBuilder setBackoffMultiplier(double backoffMultiplier);

/**
* Set the predicate to determine if retry should happen based on exception. No retry by
* default.
*/
/** Set the predicate to determine if retry should happen based on exception. */
public abstract RetryPolicyBuilder setRetryExceptionPredicate(
Predicate<IOException> retryExceptionPredicate);

Expand Down

0 comments on commit 596a8bd

Please sign in to comment.