Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port Kafka Streams exception handler test flakiness fix #703

Merged
merged 1 commit into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@

import org.hyades.kstreams.exception.ExceptionHandlerConfig.ThresholdConfig;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;

abstract class AbstractThresholdBasedExceptionHandler {

private final Clock clock;
private final Duration exceptionThresholdInterval;
private final int exceptionThresholdCount;
private Instant firstExceptionOccurredAt;
private int exceptionOccurrences;

AbstractThresholdBasedExceptionHandler(final ThresholdConfig config) {
this.clock = Clock.systemUTC();
if (config != null) {
this.exceptionThresholdInterval = config.interval();
this.exceptionThresholdCount = config.count();
Expand All @@ -22,13 +25,14 @@ abstract class AbstractThresholdBasedExceptionHandler {
}
}

AbstractThresholdBasedExceptionHandler(final Duration exceptionThresholdInterval, final int exceptionThresholdCount) {
AbstractThresholdBasedExceptionHandler(final Clock clock, final Duration exceptionThresholdInterval, final int exceptionThresholdCount) {
this.clock = clock;
this.exceptionThresholdInterval = exceptionThresholdInterval;
this.exceptionThresholdCount = exceptionThresholdCount;
}

boolean exceedsThreshold() {
final Instant now = Instant.now();
final Instant now = Instant.now(clock);
if (firstExceptionOccurredAt == null) {
firstExceptionOccurredAt = now;
exceptionOccurrences = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Clock;
import java.time.Duration;
import java.util.Map;

Expand All @@ -27,8 +28,8 @@ public DeserializationExceptionHandler() {
.orElse(null));
}

DeserializationExceptionHandler(final Duration exceptionThresholdInterval, final int exceptionThresholdCount) {
super(exceptionThresholdInterval, exceptionThresholdCount);
DeserializationExceptionHandler(final Clock clock, final Duration exceptionThresholdInterval, final int exceptionThresholdCount) {
super(clock, exceptionThresholdInterval, exceptionThresholdCount);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
Expand All @@ -30,20 +31,24 @@ private record ExceptionOccurrence(Instant occurredFirstAt, int count) {
"org.hibernate.QueryTimeoutException"
);

private final Clock clock;
private final Map<Class<? extends Throwable>, ExceptionOccurrence> transientExceptionOccurrences;
private final Duration transientExceptionThresholdInterval;
private final int transientExceptionThresholdCount;


@SuppressWarnings("unused")
ProcessingExceptionHandler(final ExceptionHandlerConfig config) {
this.clock = Clock.systemUTC();
this.transientExceptionOccurrences = new ConcurrentHashMap<>();
this.transientExceptionThresholdInterval = config.thresholds().processing().interval();
this.transientExceptionThresholdCount = config.thresholds().processing().count();
}

ProcessingExceptionHandler(final Duration transientExceptionThresholdInterval,
ProcessingExceptionHandler(final Clock clock,
final Duration transientExceptionThresholdInterval,
final int transientExceptionThresholdCount) {
this.clock = clock;
this.transientExceptionOccurrences = new ConcurrentHashMap<>();
this.transientExceptionThresholdInterval = transientExceptionThresholdInterval;
this.transientExceptionThresholdCount = transientExceptionThresholdCount;
Expand All @@ -59,7 +64,7 @@ public StreamThreadExceptionResponse handle(final Throwable throwable) {
if (isTransient(rootCause)) {
final ExceptionOccurrence occurrence = transientExceptionOccurrences
.compute(rootCause.getClass(), (key, oldValue) -> {
final Instant now = Instant.now();
final Instant now = Instant.now(clock);
if (oldValue == null) {
return new ExceptionOccurrence(now, 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Clock;
import java.time.Duration;
import java.util.Map;

Expand All @@ -26,9 +27,10 @@ public ProductionExceptionHandler() {
.orElse(null));
}

ProductionExceptionHandler(final Duration exceptionThresholdInterval,
ProductionExceptionHandler(final Clock clock,
final Duration exceptionThresholdInterval,
final int exceptionThresholdCount) {
super(exceptionThresholdInterval, exceptionThresholdCount);
super(clock, exceptionThresholdInterval, exceptionThresholdCount);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.junit.jupiter.api.Test;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class DeserializationExceptionHandlerTest {

@Test
void testHandle() {
final var record = new ConsumerRecord<>("topic", 6, 3, "key".getBytes(), "value".getBytes());
final var processorContext = mock(ProcessorContext.class);
final var handler = new DeserializationExceptionHandler(Duration.ofMinutes(5), 10);
final var handler = new DeserializationExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10);

for (int i = 0; i < 9; i++) {
assertThat(handler.handle(processorContext, record, new SerializationException())).isEqualTo(DeserializationHandlerResponse.CONTINUE);
Expand All @@ -29,16 +31,19 @@ final var record = new ConsumerRecord<>("topic", 6, 3, "key".getBytes(), "value"

@Test
void testHandleWithThresholdReset() {
final var clockMock = mock(Clock.class);
when(clockMock.instant())
.thenReturn(Instant.EPOCH)
.thenReturn(Instant.EPOCH.plusMillis(250))
.thenReturn(Instant.EPOCH.plusSeconds(1).plusMillis(251));

final var record = new ConsumerRecord<>("topic", 6, 3, "key".getBytes(), "value".getBytes());
final var processorContext = mock(ProcessorContext.class);
final var handler = new DeserializationExceptionHandler(Duration.ofMillis(250), 2);
final var handler = new DeserializationExceptionHandler(clockMock, Duration.ofSeconds(1), 2);

assertThat(handler.handle(processorContext, record, new SerializationException())).isEqualTo(DeserializationHandlerResponse.CONTINUE);
assertThat(handler.handle(processorContext, record, new SerializationException())).isEqualTo(DeserializationHandlerResponse.FAIL);

await()
.atMost(Duration.ofMillis(500))
.untilAsserted(() -> assertThat(handler.handle(processorContext, record, new SerializationException())).isEqualTo(DeserializationHandlerResponse.CONTINUE));
assertThat(handler.handle(processorContext, record, new SerializationException())).isEqualTo(DeserializationHandlerResponse.CONTINUE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,32 @@
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
import org.junit.jupiter.api.Test;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.TimeoutException;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class ProcessingExceptionHandlerTest {

@Test
void testHandleWithTransientError() {
final var handler = new ProcessingExceptionHandler(Duration.ofMinutes(5), 10);
final var handler = new ProcessingExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10);
assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.REPLACE_THREAD);
}

@Test
void testHandleWithNonTransientError() {
final var handler = new ProcessingExceptionHandler(Duration.ofMinutes(5), 10);
final var handler = new ProcessingExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10);
assertThat(handler.handle(new IllegalStateException())).isEqualTo(StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
}

@Test
void testHandleWithTransientErrorExceedingThreshold() {
final var handler = new ProcessingExceptionHandler(Duration.ofMinutes(5), 10);
final var handler = new ProcessingExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10);

for (int i = 0; i < 9; i++) {
assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.REPLACE_THREAD);
Expand All @@ -36,14 +39,17 @@ void testHandleWithTransientErrorExceedingThreshold() {

@Test
void testHandleWithTransientErrorThresholdReset() {
final var handler = new ProcessingExceptionHandler(Duration.ofMillis(250), 2);
final var clockMock = mock(Clock.class);
when(clockMock.instant())
.thenReturn(Instant.EPOCH)
.thenReturn(Instant.EPOCH.plusMillis(250))
.thenReturn(Instant.EPOCH.plusSeconds(1).plusMillis(251));

final var handler = new ProcessingExceptionHandler(clockMock, Duration.ofSeconds(1), 2);

assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.REPLACE_THREAD);
assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.SHUTDOWN_CLIENT);

await()
.atMost(Duration.ofMillis(500))
.untilAsserted(() -> assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.REPLACE_THREAD));
assertThat(handler.handle(new TimeoutException())).isEqualTo(StreamThreadExceptionResponse.REPLACE_THREAD);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
import org.junit.jupiter.api.Test;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class ProductionExceptionHandlerTest {

@Test
void testHandle() {
final var record = new ProducerRecord<>("topic", 6, "key".getBytes(), "value".getBytes());
final var handler = new ProductionExceptionHandler(Duration.ofMinutes(5), 10);
final var handler = new ProductionExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10);

for (int i = 0; i < 9; i++) {
assertThat(handler.handle(record, new RecordTooLargeException())).isEqualTo(ProductionExceptionHandlerResponse.CONTINUE);
Expand All @@ -26,8 +30,14 @@ final var record = new ProducerRecord<>("topic", 6, "key".getBytes(), "value".ge

@Test
void testHandleWithThresholdReset() {
final var clockMock = mock(Clock.class);
when(clockMock.instant())
.thenReturn(Instant.EPOCH)
.thenReturn(Instant.EPOCH.plusMillis(250))
.thenReturn(Instant.EPOCH.plusSeconds(1).plusMillis(251));

final var record = new ProducerRecord<>("topic", 6, "key".getBytes(), "value".getBytes());
final var handler = new ProductionExceptionHandler(Duration.ofMillis(250), 2);
final var handler = new ProductionExceptionHandler(clockMock, Duration.ofSeconds(1), 2);

assertThat(handler.handle(record, new RecordTooLargeException())).isEqualTo(ProductionExceptionHandlerResponse.CONTINUE);
assertThat(handler.handle(record, new RecordTooLargeException())).isEqualTo(ProductionExceptionHandlerResponse.FAIL);
Expand All @@ -40,7 +50,7 @@ final var record = new ProducerRecord<>("topic", 6, "key".getBytes(), "value".ge
@Test
void testHandleWithUnexpectedException() {
final var record = new ProducerRecord<>("topic", 6, "key".getBytes(), "value".getBytes());
final var handler = new ProductionExceptionHandler(Duration.ofMinutes(5), 10);
final var handler = new ProductionExceptionHandler(Clock.systemUTC(), Duration.ofMinutes(5), 10);

assertThat(handler.handle(record, new IllegalStateException())).isEqualTo(ProductionExceptionHandlerResponse.FAIL);
}
Expand Down
Loading