diff --git a/s3/pom.xml b/s3/pom.xml index 8a9a843..1a0cc9d 100644 --- a/s3/pom.xml +++ b/s3/pom.xml @@ -32,7 +32,7 @@ com.google.guava guava - 20.0 + 24.1.1-jre diff --git a/s3/src/main/java/com/instaclustr/kafka/connect/s3/source/AwsStorageSourceTask.java b/s3/src/main/java/com/instaclustr/kafka/connect/s3/source/AwsStorageSourceTask.java index 97f5673..37a1289 100644 --- a/s3/src/main/java/com/instaclustr/kafka/connect/s3/source/AwsStorageSourceTask.java +++ b/s3/src/main/java/com/instaclustr/kafka/connect/s3/source/AwsStorageSourceTask.java @@ -2,6 +2,7 @@ import com.amazonaws.AmazonClientException; import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.UncheckedExecutionException; import com.google.common.util.concurrent.UncheckedTimeoutException; import com.instaclustr.kafka.connect.s3.AwsConnectorStringFormats; import com.instaclustr.kafka.connect.s3.AwsStorageConnectorCommonConfig; @@ -16,7 +17,9 @@ import java.io.IOException; import java.util.*; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; public class AwsStorageSourceTask extends SourceTask { @@ -139,20 +142,24 @@ public List poll() { } catch (InterruptedException e) { log.info("Thread interrupted in poll. Shutting down", e); Thread.currentThread().interrupt(); - } catch (AmazonClientException exception) { - if (!exception.isRetryable()) { - throw exception; - } else { - log.warn("Retryable S3 service exception while reading from s3", exception); + } catch (UncheckedExecutionException | ExecutionException | TimeoutException e) { + Throwable exceptionCause = e.getCause(); + if (exceptionCause instanceof AmazonClientException) { + AmazonClientException amazonClientException = (AmazonClientException) exceptionCause; + if (!amazonClientException.isRetryable()) { + throw amazonClientException; + } else { + log.warn("Retryable S3 service exception while reading from s3", e); + if (topicPartition != null) { + awsSourceReader.revertAwsReadPositionMarker(topicPartition); + } + } + } else if (exceptionCause instanceof IOException || e instanceof TimeoutException) { + log.warn("Retryable exception while reading from s3", e); if (topicPartition != null) { awsSourceReader.revertAwsReadPositionMarker(topicPartition); } } - } catch (IOException | UncheckedTimeoutException exception) { - log.warn("Retryable exception while reading from s3", exception); - if (topicPartition != null) { - awsSourceReader.revertAwsReadPositionMarker(topicPartition); - } } catch (RuntimeException e){ throw e; } catch (Exception e) { diff --git a/s3/src/main/java/com/instaclustr/kafka/connect/s3/source/TopicPartitionSegmentParser.java b/s3/src/main/java/com/instaclustr/kafka/connect/s3/source/TopicPartitionSegmentParser.java index c8c8081..2611180 100644 --- a/s3/src/main/java/com/instaclustr/kafka/connect/s3/source/TopicPartitionSegmentParser.java +++ b/s3/src/main/java/com/instaclustr/kafka/connect/s3/source/TopicPartitionSegmentParser.java @@ -1,5 +1,6 @@ package com.instaclustr.kafka.connect.s3.source; +import com.amazonaws.AmazonClientException; import com.google.common.util.concurrent.SimpleTimeLimiter; import com.google.common.util.concurrent.TimeLimiter; import com.instaclustr.kafka.connect.s3.AwsConnectorStringFormats; @@ -12,9 +13,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.HashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.regex.Matcher; /** @@ -62,7 +61,7 @@ public TopicPartitionSegmentParser(final InputStream s3ObjectInputStream, final this.topicPrefix = topicPrefix; this.targetTopic = AwsConnectorStringFormats.generateTargetTopic(topicPrefix, topic); this.singleThreadExecutor = Executors.newSingleThreadExecutor(); - this.timeLimiter = new SimpleTimeLimiter(this.singleThreadExecutor); + this.timeLimiter = SimpleTimeLimiter.create(this.singleThreadExecutor); } public void closeResources() throws IOException, InterruptedException { @@ -99,7 +98,7 @@ private SourceRecord getNextRecord() throws IOException { //blocking call public SourceRecord getNextRecord(Long time, TimeUnit units) throws Exception { try { - return this.timeLimiter.callWithTimeout(this::getNextRecord, time, units, true); + return this.timeLimiter.callWithTimeout(this::getNextRecord, time, units); } catch (Exception e) { this.closeResources(); //not possible to read from this stream after a timeout as read positions gets messed up throw e; diff --git a/s3/src/test/java/com/instaclustr/kafka/connect/s3/source/AwsStorageSourceTaskTest.java b/s3/src/test/java/com/instaclustr/kafka/connect/s3/source/AwsStorageSourceTaskTest.java index a0ac0e1..d367bd4 100644 --- a/s3/src/test/java/com/instaclustr/kafka/connect/s3/source/AwsStorageSourceTaskTest.java +++ b/s3/src/test/java/com/instaclustr/kafka/connect/s3/source/AwsStorageSourceTaskTest.java @@ -16,6 +16,8 @@ import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.HashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import static org.mockito.Mockito.*; @@ -54,7 +56,7 @@ public void givenNonResponsiveObjectStreamResetReadPosition() throws Exception { doReturn("test").when(mockTopicPartitionSegmentParser).getTopic(); doReturn(0).when(mockTopicPartitionSegmentParser).getPartition(); - doThrow(new UncheckedTimeoutException()).when(mockTopicPartitionSegmentParser).getNextRecord(any(), any()); + doThrow(new TimeoutException()).when(mockTopicPartitionSegmentParser).getNextRecord(any(), any()); AwsStorageSourceTask awsStorageSourceTask = new AwsStorageSourceTask(mockTransferManagerProvider, mockAwsSourceReader); awsStorageSourceTask.poll(); @@ -71,7 +73,7 @@ public void givenObjectStreamThatThrowsIOExceptionResetReadPosition() throws Exc doReturn(mockTopicPartitionSegmentParser).when(mockAwsSourceReader).getNextTopicPartitionSegmentParser(); doReturn("test").when(mockTopicPartitionSegmentParser).getTopic(); doReturn(0).when(mockTopicPartitionSegmentParser).getPartition(); - doThrow(new IOException()).when(mockTopicPartitionSegmentParser).getNextRecord(any(), any()); + doThrow(new ExecutionException(new IOException())).when(mockTopicPartitionSegmentParser).getNextRecord(any(), any()); AwsStorageSourceTask awsStorageSourceTask = new AwsStorageSourceTask(mockTransferManagerProvider, mockAwsSourceReader); awsStorageSourceTask.poll(); diff --git a/s3/src/test/java/com/instaclustr/kafka/connect/s3/source/TopicPartitionSegmentParserTest.java b/s3/src/test/java/com/instaclustr/kafka/connect/s3/source/TopicPartitionSegmentParserTest.java index da728a3..46dd6c4 100644 --- a/s3/src/test/java/com/instaclustr/kafka/connect/s3/source/TopicPartitionSegmentParserTest.java +++ b/s3/src/test/java/com/instaclustr/kafka/connect/s3/source/TopicPartitionSegmentParserTest.java @@ -13,7 +13,9 @@ import org.testng.annotations.Test; import java.io.*; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class TopicPartitionSegmentParserTest { @@ -72,7 +74,7 @@ public void givenNonResponsiveStreamTriggerTimeoutOnDefinedTimePeriod() throws I try (PipedInputStream empty = new PipedInputStream(1)) { pipedOutputStream = new PipedOutputStream(empty); //making sure we just have an unresponsive stream and not throwing an ioexception TopicPartitionSegmentParser topicPartitionSegmentParser = new TopicPartitionSegmentParser(empty, s3ObjectKey, ""); - Assert.expectThrows(UncheckedTimeoutException.class, () -> topicPartitionSegmentParser.getNextRecord(100L, TimeUnit.MILLISECONDS)); + Assert.expectThrows(TimeoutException.class, () -> topicPartitionSegmentParser.getNextRecord(5L, TimeUnit.MILLISECONDS)); } finally { if (pipedOutputStream != null) { pipedOutputStream.close(); @@ -103,7 +105,12 @@ public void givenClosedStreamThrowIoException() throws IOException { InputStream nullInputStream = InputStream.nullInputStream(); TopicPartitionSegmentParser topicPartitionSegmentParser = new TopicPartitionSegmentParser(nullInputStream, s3ObjectKey, ""); nullInputStream.close(); - Assert.expectThrows(IOException.class, () -> topicPartitionSegmentParser.getNextRecord(5L, TimeUnit.SECONDS)); + try { + topicPartitionSegmentParser.getNextRecord(5L, TimeUnit.SECONDS); + } catch (Exception e) { + Assert.assertTrue(e instanceof ExecutionException); + Assert.assertTrue(e.getCause() instanceof IOException); + } } @Test