Skip to content

Commit

Permalink
Merge pull request #703 from DependencyTrack/port-test-flakiness-fix
Browse files Browse the repository at this point in the history
Port Kafka Streams exception handler test flakiness fix
  • Loading branch information
nscuro authored Jul 31, 2023
2 parents 8a2060a + 58a8898 commit 5ea4628
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@

import org.hyades.kstreams.exception.ExceptionHandlerConfig.ThresholdConfig;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;

abstract class AbstractThresholdBasedExceptionHandler {

private final Clock clock;
private final Duration exceptionThresholdInterval;
private final int exceptionThresholdCount;
private Instant firstExceptionOccurredAt;
private int exceptionOccurrences;

AbstractThresholdBasedExceptionHandler(final ThresholdConfig config) {
this.clock = Clock.systemUTC();
if (config != null) {
this.exceptionThresholdInterval = config.interval();
this.exceptionThresholdCount = config.count();
Expand All @@ -22,13 +25,14 @@ abstract class AbstractThresholdBasedExceptionHandler {
}
}

AbstractThresholdBasedExceptionHandler(final Duration exceptionThresholdInterval, final int exceptionThresholdCount) {
AbstractThresholdBasedExceptionHandler(final Clock clock, final Duration exceptionThresholdInterval, final int exceptionThresholdCount) {
this.clock = clock;
this.exceptionThresholdInterval = exceptionThresholdInterval;
this.exceptionThresholdCount = exceptionThresholdCount;
}

boolean exceedsThreshold() {
final Instant now = Instant.now();
final Instant now = Instant.now(clock);
if (firstExceptionOccurredAt == null) {
firstExceptionOccurredAt = now;
exceptionOccurrences = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Clock;
import java.time.Duration;
import java.util.Map;

Expand All @@ -27,8 +28,8 @@ public DeserializationExceptionHandler() {
.orElse(null));
}

DeserializationExceptionHandler(final Duration exceptionThresholdInterval, final int exceptionThresholdCount) {
super(exceptionThresholdInterval, exceptionThresholdCount);
DeserializationExceptionHandler(final Clock clock, final Duration exceptionThresholdInterval, final int exceptionThresholdCount) {
super(clock, exceptionThresholdInterval, exceptionThresholdCount);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
Expand All @@ -30,20 +31,24 @@ private record ExceptionOccurrence(Instant occurredFirstAt, int count) {
"org.hibernate.QueryTimeoutException"
);

private final Clock clock;
private final Map<Class<? extends Throwable>, ExceptionOccurrence> transientExceptionOccurrences;
private final Duration transientExceptionThresholdInterval;
private final int transientExceptionThresholdCount;


@SuppressWarnings("unused")
ProcessingExceptionHandler(final ExceptionHandlerConfig config) {
this.clock = Clock.systemUTC();
this.transientExceptionOccurrences = new ConcurrentHashMap<>();
this.transientExceptionThresholdInterval = config.thresholds().processing().interval();
this.transientExceptionThresholdCount = config.thresholds().processing().count();
}

ProcessingExceptionHandler(final Duration transientExceptionThresholdInterval,
ProcessingExceptionHandler(final Clock clock,
final Duration transientExceptionThresholdInterval,
final int transientExceptionThresholdCount) {
this.clock = clock;
this.transientExceptionOccurrences = new ConcurrentHashMap<>();
this.transientExceptionThresholdInterval = transientExceptionThresholdInterval;
this.transientExceptionThresholdCount = transientExceptionThresholdCount;
Expand All @@ -59,7 +64,7 @@ public StreamThreadExceptionResponse handle(final Throwable throwable) {
if (isTransient(rootCause)) {
final ExceptionOccurrence occurrence = transientExceptionOccurrences
.compute(rootCause.getClass(), (key, oldValue) -> {
final Instant now = Instant.now();
final Instant now = Instant.now(clock);
if (oldValue == null) {
return new ExceptionOccurrence(now, 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Clock;
import java.time.Duration;
import java.util.Map;

Expand All @@ -26,9 +27,10 @@ public ProductionExceptionHandler() {
.orElse(null));
}

ProductionExceptionHandler(final Duration exceptionThresholdInterval,
ProductionExceptionHandler(final Clock clock,
final Duration exceptionThresholdInterval,
final int exceptionThresholdCount) {
super(exceptionThresholdInterval, exceptionThresholdCount);
super(clock, exceptionThresholdInterval, exceptionThresholdCount);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.junit.jupiter.api.Test;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class DeserializationExceptionHandlerTest {

@Test
void testHandle() {
final var record = new ConsumerRecord<>("topic", 6, 3, "key".getBytes(), "value".getBytes());
final var processorContext = mock(ProcessorContext.class);
final var handler = new DeserializationExceptionHandler(Duration.ofMinutes(5), 10);
final var handler = new DeserializationExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10);

for (int i = 0; i < 9; i++) {
assertThat(handler.handle(processorContext, record, new SerializationException())).isEqualTo(DeserializationHandlerResponse.CONTINUE);
Expand All @@ -29,16 +31,19 @@ final var record = new ConsumerRecord<>("topic", 6, 3, "key".getBytes(), "value"

@Test
void testHandleWithThresholdReset() {
final var clockMock = mock(Clock.class);
when(clockMock.instant())
.thenReturn(Instant.EPOCH)
.thenReturn(Instant.EPOCH.plusMillis(250))
.thenReturn(Instant.EPOCH.plusSeconds(1).plusMillis(251));

final var record = new ConsumerRecord<>("topic", 6, 3, "key".getBytes(), "value".getBytes());
final var processorContext = mock(ProcessorContext.class);
final var handler = new DeserializationExceptionHandler(Duration.ofMillis(250), 2);
final var handler = new DeserializationExceptionHandler(clockMock, Duration.ofSeconds(1), 2);

assertThat(handler.handle(processorContext, record, new SerializationException())).isEqualTo(DeserializationHandlerResponse.CONTINUE);
assertThat(handler.handle(processorContext, record, new SerializationException())).isEqualTo(DeserializationHandlerResponse.FAIL);

await()
.atMost(Duration.ofMillis(500))
.untilAsserted(() -> assertThat(handler.handle(processorContext, record, new SerializationException())).isEqualTo(DeserializationHandlerResponse.CONTINUE));
assertThat(handler.handle(processorContext, record, new SerializationException())).isEqualTo(DeserializationHandlerResponse.CONTINUE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,32 @@
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
import org.junit.jupiter.api.Test;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.TimeoutException;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class ProcessingExceptionHandlerTest {

@Test
void testHandleWithTransientError() {
final var handler = new ProcessingExceptionHandler(Duration.ofMinutes(5), 10);
final var handler = new ProcessingExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10);
assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.REPLACE_THREAD);
}

@Test
void testHandleWithNonTransientError() {
final var handler = new ProcessingExceptionHandler(Duration.ofMinutes(5), 10);
final var handler = new ProcessingExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10);
assertThat(handler.handle(new IllegalStateException())).isEqualTo(StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
}

@Test
void testHandleWithTransientErrorExceedingThreshold() {
final var handler = new ProcessingExceptionHandler(Duration.ofMinutes(5), 10);
final var handler = new ProcessingExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10);

for (int i = 0; i < 9; i++) {
assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.REPLACE_THREAD);
Expand All @@ -36,14 +39,17 @@ void testHandleWithTransientErrorExceedingThreshold() {

@Test
void testHandleWithTransientErrorThresholdReset() {
final var handler = new ProcessingExceptionHandler(Duration.ofMillis(250), 2);
final var clockMock = mock(Clock.class);
when(clockMock.instant())
.thenReturn(Instant.EPOCH)
.thenReturn(Instant.EPOCH.plusMillis(250))
.thenReturn(Instant.EPOCH.plusSeconds(1).plusMillis(251));

final var handler = new ProcessingExceptionHandler(clockMock, Duration.ofSeconds(1), 2);

assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.REPLACE_THREAD);
assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.SHUTDOWN_CLIENT);

await()
.atMost(Duration.ofMillis(500))
.untilAsserted(() -> assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.REPLACE_THREAD));
assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.REPLACE_THREAD);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
import org.junit.jupiter.api.Test;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class ProductionExceptionHandlerTest {

@Test
void testHandle() {
final var record = new ProducerRecord<>("topic", 6, "key".getBytes(), "value".getBytes());
final var handler = new ProductionExceptionHandler(Duration.ofMinutes(5), 10);
final var handler = new ProductionExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10);

for (int i = 0; i < 9; i++) {
assertThat(handler.handle(record, new RecordTooLargeException())).isEqualTo(ProductionExceptionHandlerResponse.CONTINUE);
Expand All @@ -26,8 +30,14 @@ final var record = new ProducerRecord<>("topic", 6, "key".getBytes(), "value".ge

@Test
void testHandleWithThresholdReset() {
final var clockMock = mock(Clock.class);
when(clockMock.instant())
.thenReturn(Instant.EPOCH)
.thenReturn(Instant.EPOCH.plusMillis(250))
.thenReturn(Instant.EPOCH.plusSeconds(1).plusMillis(251));

final var record = new ProducerRecord<>("topic", 6, "key".getBytes(), "value".getBytes());
final var handler = new ProductionExceptionHandler(Duration.ofMillis(250), 2);
final var handler = new ProductionExceptionHandler(clockMock, Duration.ofSeconds(1), 2);

assertThat(handler.handle(record, new RecordTooLargeException())).isEqualTo(ProductionExceptionHandlerResponse.CONTINUE);
assertThat(handler.handle(record, new RecordTooLargeException())).isEqualTo(ProductionExceptionHandlerResponse.FAIL);
Expand All @@ -40,7 +50,7 @@ final var record = new ProducerRecord<>("topic", 6, "key".getBytes(), "value".ge
@Test
void testHandleWithUnexpectedException() {
final var record = new ProducerRecord<>("topic", 6, "key".getBytes(), "value".getBytes());
final var handler = new ProductionExceptionHandler(Duration.ofMinutes(5), 10);
final var handler = new ProductionExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10);

assertThat(handler.handle(record, new IllegalStateException())).isEqualTo(ProductionExceptionHandlerResponse.FAIL);
}
Expand Down

0 comments on commit 5ea4628

Please sign in to comment.