From 58a88982222fb791a54e08302a5e9cad2e037dc3 Mon Sep 17 00:00:00 2001 From: nscuro Date: Fri, 28 Jul 2023 13:34:08 +0200 Subject: [PATCH] Port Kafka Streams exception handler test flakiness fix From https://github.com/DependencyTrack/hyades-apiserver/pull/251 Signed-off-by: nscuro --- ...bstractThresholdBasedExceptionHandler.java | 8 +++++-- .../DeserializationExceptionHandler.java | 5 ++-- .../exception/ProcessingExceptionHandler.java | 9 +++++-- .../exception/ProductionExceptionHandler.java | 6 +++-- .../DeserializationExceptionHandlerTest.java | 19 +++++++++------ .../ProcessingExceptionHandlerTest.java | 24 ++++++++++++------- .../ProductionExceptionHandlerTest.java | 16 ++++++++++--- 7 files changed, 60 insertions(+), 27 deletions(-) diff --git a/commons-kstreams/src/main/java/org/hyades/kstreams/exception/AbstractThresholdBasedExceptionHandler.java b/commons-kstreams/src/main/java/org/hyades/kstreams/exception/AbstractThresholdBasedExceptionHandler.java index 7ca1ca6a9..54610accb 100644 --- a/commons-kstreams/src/main/java/org/hyades/kstreams/exception/AbstractThresholdBasedExceptionHandler.java +++ b/commons-kstreams/src/main/java/org/hyades/kstreams/exception/AbstractThresholdBasedExceptionHandler.java @@ -2,17 +2,20 @@ import org.hyades.kstreams.exception.ExceptionHandlerConfig.ThresholdConfig; +import java.time.Clock; import java.time.Duration; import java.time.Instant; abstract class AbstractThresholdBasedExceptionHandler { + private final Clock clock; private final Duration exceptionThresholdInterval; private final int exceptionThresholdCount; private Instant firstExceptionOccurredAt; private int exceptionOccurrences; AbstractThresholdBasedExceptionHandler(final ThresholdConfig config) { + this.clock = Clock.systemUTC(); if (config != null) { this.exceptionThresholdInterval = config.interval(); this.exceptionThresholdCount = config.count(); @@ -22,13 +25,14 @@ abstract class AbstractThresholdBasedExceptionHandler { } } - AbstractThresholdBasedExceptionHandler(final Duration exceptionThresholdInterval, final int exceptionThresholdCount) { + AbstractThresholdBasedExceptionHandler(final Clock clock, final Duration exceptionThresholdInterval, final int exceptionThresholdCount) { + this.clock = clock; this.exceptionThresholdInterval = exceptionThresholdInterval; this.exceptionThresholdCount = exceptionThresholdCount; } boolean exceedsThreshold() { - final Instant now = Instant.now(); + final Instant now = Instant.now(clock); if (firstExceptionOccurredAt == null) { firstExceptionOccurredAt = now; exceptionOccurrences = 1; diff --git a/commons-kstreams/src/main/java/org/hyades/kstreams/exception/DeserializationExceptionHandler.java b/commons-kstreams/src/main/java/org/hyades/kstreams/exception/DeserializationExceptionHandler.java index f9e5d8adb..63f2d179f 100644 --- a/commons-kstreams/src/main/java/org/hyades/kstreams/exception/DeserializationExceptionHandler.java +++ b/commons-kstreams/src/main/java/org/hyades/kstreams/exception/DeserializationExceptionHandler.java @@ -7,6 +7,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Clock; import java.time.Duration; import java.util.Map; @@ -27,8 +28,8 @@ public DeserializationExceptionHandler() { .orElse(null)); } - DeserializationExceptionHandler(final Duration exceptionThresholdInterval, final int exceptionThresholdCount) { - super(exceptionThresholdInterval, exceptionThresholdCount); + DeserializationExceptionHandler(final Clock clock, final Duration exceptionThresholdInterval, final int exceptionThresholdCount) { + super(clock, exceptionThresholdInterval, exceptionThresholdCount); } /** diff --git a/commons-kstreams/src/main/java/org/hyades/kstreams/exception/ProcessingExceptionHandler.java b/commons-kstreams/src/main/java/org/hyades/kstreams/exception/ProcessingExceptionHandler.java index 181df2695..e8e7781e2 100644 --- a/commons-kstreams/src/main/java/org/hyades/kstreams/exception/ProcessingExceptionHandler.java +++ b/commons-kstreams/src/main/java/org/hyades/kstreams/exception/ProcessingExceptionHandler.java @@ -6,6 +6,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.Map; @@ -30,6 +31,7 @@ private record ExceptionOccurrence(Instant occurredFirstAt, int count) { "org.hibernate.QueryTimeoutException" ); + private final Clock clock; private final Map, ExceptionOccurrence> transientExceptionOccurrences; private final Duration transientExceptionThresholdInterval; private final int transientExceptionThresholdCount; @@ -37,13 +39,16 @@ private record ExceptionOccurrence(Instant occurredFirstAt, int count) { @SuppressWarnings("unused") ProcessingExceptionHandler(final ExceptionHandlerConfig config) { + this.clock = Clock.systemUTC(); this.transientExceptionOccurrences = new ConcurrentHashMap<>(); this.transientExceptionThresholdInterval = config.thresholds().processing().interval(); this.transientExceptionThresholdCount = config.thresholds().processing().count(); } - ProcessingExceptionHandler(final Duration transientExceptionThresholdInterval, + ProcessingExceptionHandler(final Clock clock, + final Duration transientExceptionThresholdInterval, final int transientExceptionThresholdCount) { + this.clock = clock; this.transientExceptionOccurrences = new ConcurrentHashMap<>(); this.transientExceptionThresholdInterval = transientExceptionThresholdInterval; this.transientExceptionThresholdCount = transientExceptionThresholdCount; @@ -59,7 +64,7 @@ public StreamThreadExceptionResponse handle(final Throwable throwable) { if (isTransient(rootCause)) { final ExceptionOccurrence occurrence = transientExceptionOccurrences .compute(rootCause.getClass(), (key, oldValue) -> { - final Instant now = Instant.now(); + final Instant now = Instant.now(clock); if (oldValue == null) { return new ExceptionOccurrence(now, 1); } diff --git a/commons-kstreams/src/main/java/org/hyades/kstreams/exception/ProductionExceptionHandler.java b/commons-kstreams/src/main/java/org/hyades/kstreams/exception/ProductionExceptionHandler.java index a33df4f38..b85d2fe7f 100644 --- a/commons-kstreams/src/main/java/org/hyades/kstreams/exception/ProductionExceptionHandler.java +++ b/commons-kstreams/src/main/java/org/hyades/kstreams/exception/ProductionExceptionHandler.java @@ -7,6 +7,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Clock; import java.time.Duration; import java.util.Map; @@ -26,9 +27,10 @@ public ProductionExceptionHandler() { .orElse(null)); } - ProductionExceptionHandler(final Duration exceptionThresholdInterval, + ProductionExceptionHandler(final Clock clock, + final Duration exceptionThresholdInterval, final int exceptionThresholdCount) { - super(exceptionThresholdInterval, exceptionThresholdCount); + super(clock, exceptionThresholdInterval, exceptionThresholdCount); } /** diff --git a/commons-kstreams/src/test/java/org/hyades/kstreams/exception/DeserializationExceptionHandlerTest.java b/commons-kstreams/src/test/java/org/hyades/kstreams/exception/DeserializationExceptionHandlerTest.java index 3c814be36..e2dbffdec 100644 --- a/commons-kstreams/src/test/java/org/hyades/kstreams/exception/DeserializationExceptionHandlerTest.java +++ b/commons-kstreams/src/test/java/org/hyades/kstreams/exception/DeserializationExceptionHandlerTest.java @@ -6,11 +6,13 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.junit.jupiter.api.Test; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; class DeserializationExceptionHandlerTest { @@ -18,7 +20,7 @@ class DeserializationExceptionHandlerTest { void testHandle() { final var record = new ConsumerRecord<>("topic", 6, 3, "key".getBytes(), "value".getBytes()); final var processorContext = mock(ProcessorContext.class); - final var handler = new DeserializationExceptionHandler(Duration.ofMinutes(5), 10); + final var handler = new DeserializationExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10); for (int i = 0; i < 9; i++) { assertThat(handler.handle(processorContext, record, new SerializationException())).isEqualTo(DeserializationHandlerResponse.CONTINUE); @@ -29,16 +31,19 @@ final var record = new ConsumerRecord<>("topic", 6, 3, "key".getBytes(), "value" @Test void testHandleWithThresholdReset() { + final var clockMock = mock(Clock.class); + when(clockMock.instant()) + .thenReturn(Instant.EPOCH) + .thenReturn(Instant.EPOCH.plusMillis(250)) + .thenReturn(Instant.EPOCH.plusSeconds(1).plusMillis(251)); + final var record = new ConsumerRecord<>("topic", 6, 3, "key".getBytes(), "value".getBytes()); final var processorContext = mock(ProcessorContext.class); - final var handler = new DeserializationExceptionHandler(Duration.ofMillis(250), 2); + final var handler = new DeserializationExceptionHandler(clockMock, Duration.ofSeconds(1), 2); assertThat(handler.handle(processorContext, record, new SerializationException())).isEqualTo(DeserializationHandlerResponse.CONTINUE); assertThat(handler.handle(processorContext, record, new SerializationException())).isEqualTo(DeserializationHandlerResponse.FAIL); - - await() - .atMost(Duration.ofMillis(500)) - .untilAsserted(() -> assertThat(handler.handle(processorContext, record, new SerializationException())).isEqualTo(DeserializationHandlerResponse.CONTINUE)); + assertThat(handler.handle(processorContext, record, new SerializationException())).isEqualTo(DeserializationHandlerResponse.CONTINUE); } } \ No newline at end of file diff --git a/commons-kstreams/src/test/java/org/hyades/kstreams/exception/ProcessingExceptionHandlerTest.java b/commons-kstreams/src/test/java/org/hyades/kstreams/exception/ProcessingExceptionHandlerTest.java index 67ac76033..c7cde9291 100644 --- a/commons-kstreams/src/test/java/org/hyades/kstreams/exception/ProcessingExceptionHandlerTest.java +++ b/commons-kstreams/src/test/java/org/hyades/kstreams/exception/ProcessingExceptionHandlerTest.java @@ -3,29 +3,32 @@ import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse; import org.junit.jupiter.api.Test; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; class ProcessingExceptionHandlerTest { @Test void testHandleWithTransientError() { - final var handler = new ProcessingExceptionHandler(Duration.ofMinutes(5), 10); + final var handler = new ProcessingExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10); assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.REPLACE_THREAD); } @Test void testHandleWithNonTransientError() { - final var handler = new ProcessingExceptionHandler(Duration.ofMinutes(5), 10); + final var handler = new ProcessingExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10); assertThat(handler.handle(new IllegalStateException())).isEqualTo(StreamThreadExceptionResponse.SHUTDOWN_CLIENT); } @Test void testHandleWithTransientErrorExceedingThreshold() { - final var handler = new ProcessingExceptionHandler(Duration.ofMinutes(5), 10); + final var handler = new ProcessingExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10); for (int i = 0; i < 9; i++) { assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.REPLACE_THREAD); @@ -36,14 +39,17 @@ void testHandleWithTransientErrorExceedingThreshold() { @Test void testHandleWithTransientErrorThresholdReset() { - final var handler = new ProcessingExceptionHandler(Duration.ofMillis(250), 2); + final var clockMock = mock(Clock.class); + when(clockMock.instant()) + .thenReturn(Instant.EPOCH) + .thenReturn(Instant.EPOCH.plusMillis(250)) + .thenReturn(Instant.EPOCH.plusSeconds(1).plusMillis(251)); + + final var handler = new ProcessingExceptionHandler(clockMock, Duration.ofSeconds(1), 2); assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.REPLACE_THREAD); assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.SHUTDOWN_CLIENT); - - await() - .atMost(Duration.ofMillis(500)) - .untilAsserted(() -> assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.REPLACE_THREAD)); + assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.REPLACE_THREAD); } } \ No newline at end of file diff --git a/commons-kstreams/src/test/java/org/hyades/kstreams/exception/ProductionExceptionHandlerTest.java b/commons-kstreams/src/test/java/org/hyades/kstreams/exception/ProductionExceptionHandlerTest.java index ef70d02d9..25fafeb6c 100644 --- a/commons-kstreams/src/test/java/org/hyades/kstreams/exception/ProductionExceptionHandlerTest.java +++ b/commons-kstreams/src/test/java/org/hyades/kstreams/exception/ProductionExceptionHandlerTest.java @@ -5,17 +5,21 @@ import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse; import org.junit.jupiter.api.Test; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; class ProductionExceptionHandlerTest { @Test void testHandle() { final var record = new ProducerRecord<>("topic", 6, "key".getBytes(), "value".getBytes()); - final var handler = new ProductionExceptionHandler(Duration.ofMinutes(5), 10); + final var handler = new ProductionExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10); for (int i = 0; i < 9; i++) { assertThat(handler.handle(record, new RecordTooLargeException())).isEqualTo(ProductionExceptionHandlerResponse.CONTINUE); @@ -26,8 +30,14 @@ final var record = new ProducerRecord<>("topic", 6, "key".getBytes(), "value".ge @Test void testHandleWithThresholdReset() { + final var clockMock = mock(Clock.class); + when(clockMock.instant()) + .thenReturn(Instant.EPOCH) + .thenReturn(Instant.EPOCH.plusMillis(250)) + .thenReturn(Instant.EPOCH.plusSeconds(1).plusMillis(251)); + final var record = new ProducerRecord<>("topic", 6, "key".getBytes(), "value".getBytes()); - final var handler = new ProductionExceptionHandler(Duration.ofMillis(250), 2); + final var handler = new ProductionExceptionHandler(clockMock, Duration.ofSeconds(1), 2); assertThat(handler.handle(record, new RecordTooLargeException())).isEqualTo(ProductionExceptionHandlerResponse.CONTINUE); assertThat(handler.handle(record, new RecordTooLargeException())).isEqualTo(ProductionExceptionHandlerResponse.FAIL); @@ -40,7 +50,7 @@ final var record = new ProducerRecord<>("topic", 6, "key".getBytes(), "value".ge @Test void testHandleWithUnexpectedException() { final var record = new ProducerRecord<>("topic", 6, "key".getBytes(), "value".getBytes()); - final var handler = new ProductionExceptionHandler(Duration.ofMinutes(5), 10); + final var handler = new ProductionExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10); assertThat(handler.handle(record, new IllegalStateException())).isEqualTo(ProductionExceptionHandlerResponse.FAIL); }