diff --git a/.github/workflows/Build.yml b/.github/workflows/Build.yml index 67a5b1f1..54571a2a 100644 --- a/.github/workflows/Build.yml +++ b/.github/workflows/Build.yml @@ -11,7 +11,7 @@ jobs: - name: Set up JDK uses: actions/setup-java@v3 with: - java-version: '17' + java-version: '8' distribution: 'temurin' cache: maven - name: verify diff --git a/.github/workflows/Deploy.yml b/.github/workflows/Deploy.yml index 348f5303..c383451e 100644 --- a/.github/workflows/Deploy.yml +++ b/.github/workflows/Deploy.yml @@ -12,7 +12,7 @@ jobs: - name: Set up JDK uses: actions/setup-java@v3 with: - java-version: '17' + java-version: '8' distribution: 'temurin' cache: maven server-id: ossrh diff --git a/CHANGELOG.md b/CHANGELOG.md index cfdfa393..191883fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,3 @@ -# 1.0.0 - -* maximum amount of messages per batch must be set manually if custom value is preferred (default remains 10) -* compatible with Spring Boot 3.1.* -* use `S3AsyncClient` instead of synchronous `S3Client` - # 0.8.1 * pass MDC context to workers diff --git a/pom.xml b/pom.xml index 0bafd5fe..03282585 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ sqs-utils com.mercateo.sqs sqs-utils - 1.0.0-SNAPSHOT + 0.9.1-SNAPSHOT jar Some utility classes for AWS SQS @@ -39,7 +39,7 @@ scm:git:git@github.com:Mercateo/sqs-utils.git scm:git:git@github.com:Mercateo/sqs-utils.git https://github.com/Mercateo/sqs-utils.git - HEAD + 0.8.0 @@ -59,21 +59,13 @@ UTF-8 2.14.2 - 2.27.17 - 3.0.4 + 1.12.406 2.0.6 5.9.2 - - software.amazon.awssdk - bom - ${aws-sdk-sqs.version} - pom - import - com.fasterxml.jackson jackson-bom @@ -93,42 +85,31 @@ - software.amazon.awssdk - sqs + com.amazonaws + aws-java-sdk-sqs + ${aws-sdk-sqs.version} org.projectlombok lombok - 1.18.34 + 1.18.26 provided - io.awspring.cloud - spring-cloud-aws-dependencies - ${spring-cloud-aws.version} - pom + javax.inject + javax.inject + 1 - - io.awspring.cloud - spring-cloud-aws-sqs - ${spring-cloud-aws.version} - - - org.springframework - spring-messaging - 6.0.11 - - - jakarta.inject - jakarta.inject-api - 2.0.0 - - org.slf4j slf4j-api ${slf4j.version} + + io.awspring.cloud + spring-cloud-aws-messaging + 2.4.4 + com.github.rholder guava-retrying @@ -157,7 +138,8 @@ org.mockito mockito-core - 5.12.0 + + 4.11.0 test @@ -193,11 +175,11 @@ maven-compiler-plugin - 3.13.0 + 3.6.1 - 17 - 17 - 17 + 1.8 + 1.8 + 1.8 true true true diff --git a/src/main/java/com/mercateo/sqs/utils/message/handling/DefaultErrorHandlingStrategy.java b/src/main/java/com/mercateo/sqs/utils/message/handling/DefaultErrorHandlingStrategy.java index 1d361ea3..92e6dc21 100644 --- a/src/main/java/com/mercateo/sqs/utils/message/handling/DefaultErrorHandlingStrategy.java +++ b/src/main/java/com/mercateo/sqs/utils/message/handling/DefaultErrorHandlingStrategy.java @@ -15,45 +15,46 @@ */ package com.mercateo.sqs.utils.message.handling; -import java.util.Objects; +import com.amazonaws.AmazonServiceException; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import software.amazon.awssdk.awscore.exception.AwsServiceException; +import org.springframework.messaging.Message; @Slf4j class DefaultErrorHandlingStrategy implements ErrorHandlingStrategy { @Override @SneakyThrows - public void handleWorkerException(Exception e, MessageWrapper message) { - log.error("error while handling message " + message.getMessageId() + ": " + message.getMessage().getPayload(), e); + public void handleWorkerException(Exception e, Message message) { + String messageId = message.getHeaders().get("MessageId", String.class); + log.error("error while handling message " + messageId + ": " + message.getPayload(), e); throw e; } @Override @SneakyThrows - public void handleWorkerThrowable(Throwable t, MessageWrapper message) { - String messageId = String.valueOf(message.getMessageId()); - log.error("error while handling message " + messageId + ": " + message.getMessage().getPayload(), t); + public void handleWorkerThrowable(Throwable t, Message message) { + String messageId = message.getHeaders().get("MessageId", String.class); + log.error("error while handling message " + messageId + ": " + message.getPayload(), t); throw t; } @Override - public void handleExtendVisibilityTimeoutException(AwsServiceException e, - MessageWrapper message) { + public void handleExtendVisibilityTimeoutException(AmazonServiceException e, + Message message) { - String msg = "error while extending message visibility for " + Objects.requireNonNull( - message.getMessageId()); + String msg = "error while extending message visibility for " + message.getHeaders().get("MessageId", + String.class); log.error(msg, e); throw e; } @Override - public void handleAcknowledgeMessageException(AwsServiceException e, MessageWrapper message) { - String messageId = message.getMessageId(); + public void handleAcknowledgeMessageException(AmazonServiceException e, Message message) { + String messageId = message.getHeaders().get("MessageId", String.class); log.error("could not acknowledge " + messageId, e); } diff --git a/src/main/java/com/mercateo/sqs/utils/message/handling/ErrorHandlingStrategy.java b/src/main/java/com/mercateo/sqs/utils/message/handling/ErrorHandlingStrategy.java index 8e68f7e4..38a87f1c 100644 --- a/src/main/java/com/mercateo/sqs/utils/message/handling/ErrorHandlingStrategy.java +++ b/src/main/java/com/mercateo/sqs/utils/message/handling/ErrorHandlingStrategy.java @@ -15,7 +15,9 @@ */ package com.mercateo.sqs.utils.message.handling; -import software.amazon.awssdk.awscore.exception.AwsServiceException; +import com.amazonaws.AmazonServiceException; + +import org.springframework.messaging.Message; public interface ErrorHandlingStrategy { @@ -28,7 +30,7 @@ public interface ErrorHandlingStrategy { * @param message * that was incorrectly processed */ - void handleWorkerException(Exception e, MessageWrapper message); + void handleWorkerException(Exception e, Message message); /** * Defines how a throwable, that is thrown by the worker are handled. If a @@ -39,7 +41,7 @@ public interface ErrorHandlingStrategy { * @param message * that was incorrectly processed */ - void handleWorkerThrowable(Throwable t, MessageWrapper message); + void handleWorkerThrowable(Throwable t, Message message); /** * Defines how exceptions, that are thrown by the timeout extension are handled. @@ -50,7 +52,7 @@ public interface ErrorHandlingStrategy { * @param message that was tried to extend */ - void handleExtendVisibilityTimeoutException(AwsServiceException e, MessageWrapper message); + void handleExtendVisibilityTimeoutException(AmazonServiceException e, Message message); /** * Defines how exceptions, that are thrown by the message acknowledgement are handled. @@ -61,6 +63,8 @@ public interface ErrorHandlingStrategy { * @param message that was tried to extend */ - void handleAcknowledgeMessageException(AwsServiceException e, MessageWrapper message); + void handleAcknowledgeMessageException(AmazonServiceException e, Message message); + + } diff --git a/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.java b/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.java index ca827809..834e77f8 100644 --- a/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.java +++ b/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.java @@ -19,6 +19,8 @@ import com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtenderFactory; import java.time.Duration; +import java.util.Map; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -27,6 +29,7 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.slf4j.MDC; import org.springframework.messaging.Message; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -78,7 +81,24 @@ public class LongRunningMessageHandler { this.awaitShutDown = awaitShutDown; this.errorHandlingStrategy = errorHandlingStrategy; - messageProcessingExecutor = new ThreadPoolTaskExecutor(); + messageProcessingExecutor = new ThreadPoolTaskExecutor() { + @Override + public Future submit(Runnable task) { + Map current = MDC.getCopyOfContextMap(); + return super.submit(() -> { + try { + if (current != null) { + current.forEach(MDC::put); + } + task.run(); + } finally { + if (current != null) { + current.keySet().forEach(MDC::remove); + } + } + }); + } + }; messageProcessingExecutor.setCorePoolSize(numberOfThreads); messageProcessingExecutor.setMaxPoolSize(numberOfThreads); messageProcessingExecutor.setThreadNamePrefix(getClass().getSimpleName()+"-"+queue.getName().getId()+"-"); @@ -131,7 +151,6 @@ public class LongRunningMessageHandler { public void handleMessage(@NonNull Message message) { MessageWrapper messageWrapper = new MessageWrapper<>(message); String messageId = messageWrapper.getMessageId(); - if (messagesInProcessing.contains(messageId)) { return; } @@ -173,17 +192,17 @@ public int getFreeWorkerCapacity() { return messagesInProcessing.free(); } - private void scheduleNewMessageTask(@NonNull MessageWrapper message, + private void scheduleNewMessageTask(@NonNull MessageWrapper messageWrapper, ScheduledFuture visibilityTimeoutExtender) { MessageHandlingRunnable messageTask = messageHandlingRunnableFactory.get(worker, - message, finishedMessageCallback, messagesInProcessing, visibilityTimeoutExtender, errorHandlingStrategy); + messageWrapper, finishedMessageCallback, messagesInProcessing, visibilityTimeoutExtender, errorHandlingStrategy); messageProcessingExecutor.submit(messageTask); } - private ScheduledFuture scheduleNewVisibilityTimeoutExtender(@NonNull MessageWrapper message) { + private ScheduledFuture scheduleNewVisibilityTimeoutExtender(@NonNull MessageWrapper messageWrapper) { return timeoutExtensionExecutor.scheduleAtFixedRate( - timeoutExtenderFactory.get(message, queue, errorHandlingStrategy), + timeoutExtenderFactory.get(messageWrapper, queue, errorHandlingStrategy), timeUntilVisibilityTimeoutExtension.toMillis(), timeUntilVisibilityTimeoutExtension.toMillis(), TimeUnit.MILLISECONDS); diff --git a/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerFactory.java b/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerFactory.java index 1d01ddd8..3155417d 100644 --- a/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerFactory.java +++ b/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerFactory.java @@ -15,19 +15,21 @@ */ package com.mercateo.sqs.utils.message.handling; -import com.google.common.annotations.VisibleForTesting; import com.mercateo.sqs.utils.queue.Queue; import com.mercateo.sqs.utils.queue.QueueFactory; import com.mercateo.sqs.utils.queue.QueueName; import com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtenderFactory; +import io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer; + +import java.lang.reflect.Field; import java.time.Duration; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; -import jakarta.inject.Inject; -import jakarta.inject.Named; +import javax.inject.Inject; +import javax.inject.Named; import lombok.NonNull; @@ -43,13 +45,15 @@ public class LongRunningMessageHandlerFactory { private final ScheduledExecutorService executorService; - private int maxNumberOfMessagesPerBatch; + // visible for testing + final int maxNumberOfMessagesPerBatch; @Inject public LongRunningMessageHandlerFactory( @NonNull MessageHandlingRunnableFactory messageHandlingRunnableFactory, @NonNull VisibilityTimeoutExtenderFactory timeoutExtenderFactory, - @NonNull QueueFactory queueFactory) { + @NonNull QueueFactory queueFactory, + @NonNull SimpleMessageListenerContainer simpleMessageListenerContainer) { this.messageHandlingRunnableFactory = messageHandlingRunnableFactory; this.timeoutExtenderFactory = timeoutExtenderFactory; this.queueFactory = queueFactory; @@ -69,16 +73,27 @@ public Thread newThread(Runnable r) { } }); - this.maxNumberOfMessagesPerBatch = 10; - } - - public void setMaxConcurrentMessages(Integer maxConcurrentMessages) { - this.maxNumberOfMessagesPerBatch = maxConcurrentMessages; + this.maxNumberOfMessagesPerBatch = extractMaxNumberOfMessagesFromListenerContainer( + simpleMessageListenerContainer); } - @VisibleForTesting - int getMaxConcurrentMessages() { - return this.maxNumberOfMessagesPerBatch; + private int extractMaxNumberOfMessagesFromListenerContainer( + @NonNull SimpleMessageListenerContainer simpleMessageListenerContainer) { + try { + Field f = simpleMessageListenerContainer.getClass().getSuperclass().getDeclaredField( + "maxNumberOfMessages"); + f.setAccessible(true); + Integer maxNumberOfMessages = (Integer) f.get(simpleMessageListenerContainer); + if (maxNumberOfMessages != null) { + return maxNumberOfMessages; + } else { + // org.springframework.cloud.aws.messaging.listener.AbstractMessageListenerContainer.DEFAULT_MAX_NUMBER_OF_MESSAGES + return 10; + } + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new IllegalStateException( + "Cannot get BatchSize of SimpleMessageListenerContainer", e); + } } /** diff --git a/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnable.java b/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnable.java index 84b13dd3..a86f916f 100644 --- a/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnable.java +++ b/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnable.java @@ -14,6 +14,8 @@ */ package com.mercateo.sqs.utils.message.handling; +import com.amazonaws.AmazonServiceException; + import java.util.concurrent.ScheduledFuture; import lombok.NonNull; @@ -21,7 +23,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.messaging.MessageHeaders; -import software.amazon.awssdk.awscore.exception.AwsServiceException; @Slf4j @RequiredArgsConstructor @@ -57,14 +58,14 @@ public void run() { finishedMessageCallback.call(payload, outcome); acknowledge(); - log.info("message task successfully processed and message acknowledged: " + messageId); + log.info("message task successfully processed and message acknowledged: " + messageId); // } catch (InterruptedException e) { log.info("got interrupted, did not finish: " + messageId, e); } catch (Exception e) { - errorHandlingStrategy.handleWorkerException(e, messageWrapper); + errorHandlingStrategy.handleWorkerException(e, messageWrapper.getMessage()); acknowledge(); } catch (Throwable t) { - errorHandlingStrategy.handleWorkerThrowable(t, messageWrapper); + errorHandlingStrategy.handleWorkerThrowable(t, messageWrapper.getMessage()); acknowledge(); } finally { visibilityTimeoutExtender.cancel(false); @@ -75,8 +76,8 @@ public void run() { private void acknowledge() { try { messageWrapper.acknowledge(); - } catch (AwsServiceException e) { - errorHandlingStrategy.handleAcknowledgeMessageException(e, messageWrapper); + } catch (AmazonServiceException e) { + errorHandlingStrategy.handleAcknowledgeMessageException(e, messageWrapper.getMessage()); } catch (Exception e) { log.error("failure during acknowledge " + messageWrapper.getMessageId(), e); } diff --git a/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableFactory.java b/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableFactory.java index 3d91276c..c7cd77b2 100644 --- a/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableFactory.java +++ b/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableFactory.java @@ -17,7 +17,7 @@ import java.util.concurrent.ScheduledFuture; -import jakarta.inject.Named; +import javax.inject.Named; import lombok.NonNull; diff --git a/src/main/java/com/mercateo/sqs/utils/message/handling/MessageWrapper.java b/src/main/java/com/mercateo/sqs/utils/message/handling/MessageWrapper.java index b042ad9f..ce167bbe 100644 --- a/src/main/java/com/mercateo/sqs/utils/message/handling/MessageWrapper.java +++ b/src/main/java/com/mercateo/sqs/utils/message/handling/MessageWrapper.java @@ -1,19 +1,18 @@ package com.mercateo.sqs.utils.message.handling; -import io.awspring.cloud.sqs.MessagingHeaders; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest; + +import io.awspring.cloud.messaging.listener.Acknowledgment; -import java.util.UUID; import java.util.concurrent.TimeUnit; -import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementCallback; import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import org.springframework.messaging.Message; -import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; @RequiredArgsConstructor public class MessageWrapper { @@ -25,7 +24,7 @@ public class MessageWrapper { private boolean acknowledged = false; public String getMessageId() { - return String.valueOf(message.getHeaders().get("id", UUID.class)); + return message.getHeaders().get("MessageId", String.class); } public String getReceiptHandle() { @@ -34,24 +33,18 @@ public String getReceiptHandle() { @SneakyThrows public synchronized void acknowledge() { - AcknowledgementCallback acknowledgementCallback = message.getHeaders().get( - MessagingHeaders.ACKNOWLEDGMENT_CALLBACK_HEADER, AcknowledgementCallback.class); - if (acknowledgementCallback == null) { - throw new NullPointerException("There is no \"AcknowledgementCallback\" in the message headers"); - } - try { - acknowledgementCallback.onAcknowledge(message).get(2, TimeUnit.MINUTES); - acknowledged = true; - } catch (Exception e) { - throw new RuntimeException("Failed to acknowledge message", e); + Acknowledgment acknowledgment = message.getHeaders().get("Acknowledgment", Acknowledgment.class); + if (acknowledgment == null) { + throw new NullPointerException("there is no \"Acknowledgment\" in the message headers"); } + acknowledgment.acknowledge().get(2, TimeUnit.MINUTES); + acknowledged = true; } - @SneakyThrows - public synchronized void changeMessageVisibility(SqsAsyncClient sqsClient, ChangeMessageVisibilityRequest request) { + public synchronized void changeMessageVisibility(AmazonSQS sqsClient, ChangeMessageVisibilityRequest request) { if (acknowledged) { return; } - sqsClient.changeMessageVisibility(request).get(); + sqsClient.changeMessageVisibility(request); } } diff --git a/src/main/java/com/mercateo/sqs/utils/queue/Queue.java b/src/main/java/com/mercateo/sqs/utils/queue/Queue.java index 51f99232..31f6c5cf 100644 --- a/src/main/java/com/mercateo/sqs/utils/queue/Queue.java +++ b/src/main/java/com/mercateo/sqs/utils/queue/Queue.java @@ -20,7 +20,6 @@ import lombok.Data; import lombok.NonNull; -import software.amazon.awssdk.services.sqs.model.QueueAttributeName; @Data public class Queue { @@ -32,9 +31,9 @@ public class Queue { private final String url; @NonNull - private final Map queueAttributes; + private final Map queueAttributes; public Duration getDefaultVisibilityTimeout() { - return Duration.ofSeconds(Integer.parseInt(queueAttributes.get(QueueAttributeName.VISIBILITY_TIMEOUT))); + return Duration.ofSeconds(Integer.parseInt(queueAttributes.get("VisibilityTimeout"))); } } \ No newline at end of file diff --git a/src/main/java/com/mercateo/sqs/utils/queue/QueueFactory.java b/src/main/java/com/mercateo/sqs/utils/queue/QueueFactory.java index 159e8f6d..2df1785f 100644 --- a/src/main/java/com/mercateo/sqs/utils/queue/QueueFactory.java +++ b/src/main/java/com/mercateo/sqs/utils/queue/QueueFactory.java @@ -15,42 +15,36 @@ */ package com.mercateo.sqs.utils.queue; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.GetQueueAttributesRequest; +import com.amazonaws.services.sqs.model.GetQueueUrlRequest; + import java.util.Collections; import java.util.Map; -import jakarta.inject.Inject; -import jakarta.inject.Named; +import javax.inject.Inject; +import javax.inject.Named; import lombok.NonNull; -import lombok.SneakyThrows; -import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest; -import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; -import software.amazon.awssdk.services.sqs.model.QueueAttributeName; @Named public class QueueFactory { - private final SqsAsyncClient amazonSQS; + private final AmazonSQS amazonSQS; @Inject - public QueueFactory(@NonNull SqsAsyncClient amazonSQS) { + public QueueFactory(@NonNull AmazonSQS amazonSQS) { this.amazonSQS = amazonSQS; } - @SneakyThrows public Queue get(@NonNull QueueName queueName) { - GetQueueUrlRequest urlRequest = GetQueueUrlRequest.builder() - .queueName(queueName.getId()) - .build(); - String queueUrl = amazonSQS.getQueueUrl(urlRequest).get().queueUrl(); - - GetQueueAttributesRequest attributesRequest = GetQueueAttributesRequest - .builder() - .queueUrl(queueUrl) - .attributeNamesWithStrings(Collections.singletonList("All")).build(); - Map attributes = amazonSQS.getQueueAttributes(attributesRequest) - .get().attributes(); + GetQueueUrlRequest urlRequest = new GetQueueUrlRequest().withQueueName(queueName.getId()); + String queueUrl = amazonSQS.getQueueUrl(urlRequest).getQueueUrl(); + + GetQueueAttributesRequest attributesRequest = new GetQueueAttributesRequest(queueUrl, + Collections.singletonList("All")); + Map attributes = amazonSQS.getQueueAttributes(attributesRequest) + .getAttributes(); return new Queue(queueName, queueUrl, attributes); } diff --git a/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtender.java b/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtender.java index e9958569..98b2b775 100644 --- a/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtender.java +++ b/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtender.java @@ -1,12 +1,12 @@ /** * Copyright © 2017 Mercateo AG (http://www.mercateo.com) - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 - *

+ *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,10 +15,9 @@ */ package com.mercateo.sqs.utils.visibility; -import software.amazon.awssdk.awscore.exception.AwsServiceException; -import software.amazon.awssdk.services.sqs.*; -import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; - +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest; import com.github.rholder.retry.Retryer; import com.github.rholder.retry.RetryerBuilder; import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy; @@ -33,7 +32,7 @@ @Slf4j public class VisibilityTimeoutExtender implements Runnable { - private final SqsAsyncClient sqsClient; + private final AmazonSQS sqsClient; private final ChangeMessageVisibilityRequest request; @@ -43,25 +42,23 @@ public class VisibilityTimeoutExtender implements Runnable { private final Retryer retryer; - VisibilityTimeoutExtender(@NonNull SqsAsyncClient sqsClient, @NonNull Duration newVisibilityTimeout, - @NonNull MessageWrapper messageWrapper, @NonNull String queueUrl, - @NonNull ErrorHandlingStrategy errorHandlingStrategy, - @NonNull RetryStrategy retryStrategy) { + VisibilityTimeoutExtender(@NonNull AmazonSQS sqsClient, @NonNull Duration newVisibilityTimeout, + @NonNull MessageWrapper messageWrapper, @NonNull String queueUrl, + @NonNull ErrorHandlingStrategy errorHandlingStrategy, + @NonNull RetryStrategy retryStrategy) { this.sqsClient = sqsClient; this.messageWrapper = messageWrapper; this.errorHandlingStrategy = errorHandlingStrategy; this.retryer = RetryerBuilder - . newBuilder() + .newBuilder() .retryIfException(t -> (t.getCause() instanceof UnknownHostException)) .withWaitStrategy(retryStrategy.getRetryWaitStrategy()) .withStopStrategy(retryStrategy.getRetryStopStrategy()) .build(); - request = ChangeMessageVisibilityRequest.builder() - .queueUrl(queueUrl) - .receiptHandle(messageWrapper.getReceiptHandle()) - .visibilityTimeout(timeoutInSeconds(newVisibilityTimeout)) - .build(); + request = new ChangeMessageVisibilityRequest().withQueueUrl(queueUrl).withReceiptHandle( + messageWrapper.getReceiptHandle()).withVisibilityTimeout( + timeoutInSeconds(newVisibilityTimeout)); } private Integer timeoutInSeconds(Duration timeout) { @@ -75,11 +72,13 @@ public void run() { messageWrapper.changeMessageVisibility(sqsClient, request); return null; }); - } catch (AwsServiceException e) { - errorHandlingStrategy.handleExtendVisibilityTimeoutException(e, messageWrapper); } catch (Exception e) { - log.error("error while extending message visibility for {}", messageWrapper.getMessageId(), e); - throw new RuntimeException(e); + if (e.getCause() instanceof AmazonServiceException) { + errorHandlingStrategy.handleExtendVisibilityTimeoutException((AmazonServiceException) e.getCause(), messageWrapper.getMessage()); + } else { + log.error("error while extending message visibility for " + messageWrapper.getMessageId(), e); + throw new RuntimeException(e); + } } } } \ No newline at end of file diff --git a/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactory.java b/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactory.java index 14d52b48..2c9d447c 100644 --- a/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactory.java +++ b/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactory.java @@ -1,12 +1,12 @@ /** * Copyright © 2017 Mercateo AG (http://www.mercateo.com) - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,8 +15,7 @@ */ package com.mercateo.sqs.utils.visibility; -import software.amazon.awssdk.services.sqs.*; - +import com.amazonaws.services.sqs.AmazonSQS; import com.github.rholder.retry.StopStrategies; import com.github.rholder.retry.WaitStrategies; import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy; @@ -26,23 +25,22 @@ import java.time.Duration; import java.util.concurrent.TimeUnit; -import jakarta.inject.Inject; -import jakarta.inject.Named; +import javax.inject.Inject; +import javax.inject.Named; import lombok.NonNull; @Named public class VisibilityTimeoutExtenderFactory { - private final SqsAsyncClient sqsClient; + private final AmazonSQS sqsClient; @Inject - public VisibilityTimeoutExtenderFactory(@NonNull SqsAsyncClient amazonSQS) { + public VisibilityTimeoutExtenderFactory(@NonNull AmazonSQS amazonSQS) { this.sqsClient = amazonSQS; } - public VisibilityTimeoutExtender get(@NonNull MessageWrapper messageWrapper, @NonNull Queue queue, - @NonNull ErrorHandlingStrategy errorHandlingStrategy) { + public VisibilityTimeoutExtender get(@NonNull MessageWrapper messageWrapper, @NonNull Queue queue, @NonNull ErrorHandlingStrategy errorHandlingStrategy) { Duration defaultVisibilityTimeout = queue.getDefaultVisibilityTimeout(); diff --git a/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/src/main/resources/META-INF/spring.factories similarity index 70% rename from src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports rename to src/main/resources/META-INF/spring.factories index 5c9d490c..1fd99d9f 100644 --- a/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/src/main/resources/META-INF/spring.factories @@ -1,4 +1,5 @@ - com.mercateo.sqs.utils.message.handling.LongRunningMessageHandlerFactory - com.mercateo.sqs.utils.message.handling.MessageHandlingRunnableFactory - com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtenderFactory +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + com.mercateo.sqs.utils.message.handling.LongRunningMessageHandlerFactory,\ + com.mercateo.sqs.utils.message.handling.MessageHandlingRunnableFactory,\ + com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtenderFactory,\ com.mercateo.sqs.utils.queue.QueueFactory \ No newline at end of file diff --git a/src/test/java/com/mercateo/sqs/utils/message/handling/LogAndRethrowStrategyTest.java b/src/test/java/com/mercateo/sqs/utils/message/handling/LogAndRethrowStrategyTest.java index e85bde89..d2e01752 100644 --- a/src/test/java/com/mercateo/sqs/utils/message/handling/LogAndRethrowStrategyTest.java +++ b/src/test/java/com/mercateo/sqs/utils/message/handling/LogAndRethrowStrategyTest.java @@ -3,20 +3,22 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.catchThrowable; -import io.awspring.cloud.sqs.listener.acknowledgement.Acknowledgement; +import io.awspring.cloud.messaging.listener.Acknowledgment; + import java.util.HashMap; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.GenericMessage; public class LogAndRethrowStrategyTest { @Mock - private Acknowledgement acknowledgment; + private Acknowledgment acknowledgment; private DefaultErrorHandlingStrategy uut; @@ -27,23 +29,24 @@ public void setUp() throws Exception { } @Test - void handle_throws_exception() { + public void handle_throws_exception() { // Given Exception e = new IllegalArgumentException(); - MessageWrapper message = createMessage(); + Message message = createMessage(); // When Throwable throwable = catchThrowable(() -> uut.handleWorkerException(e, message)); // Then assertThat(throwable).isInstanceOf(IllegalArgumentException.class); + } - private MessageWrapper createMessage() { + private Message createMessage() { HashMap headerMap = new HashMap<>(); - headerMap.put("id", "mid"); + headerMap.put("MessageId", "mid"); headerMap.put("Acknowledgment", acknowledgment); - return new MessageWrapper<>(new GenericMessage<>(3, new MessageHeaders(headerMap))); + return new GenericMessage<>(3, new MessageHeaders(headerMap)); } } \ No newline at end of file diff --git a/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerFactoryTest.java b/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerFactoryTest.java index 78a596a3..5082fa07 100644 --- a/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerFactoryTest.java +++ b/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerFactoryTest.java @@ -1,12 +1,14 @@ package com.mercateo.sqs.utils.message.handling; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; import com.google.common.testing.NullPointerTester; import com.mercateo.sqs.utils.queue.QueueFactory; import com.mercateo.sqs.utils.queue.QueueName; import com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtenderFactory; +import io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; @@ -26,21 +28,21 @@ public class LongRunningMessageHandlerFactoryTest { private LongRunningMessageHandlerFactory uut; @BeforeEach - void setUp() { + public void setUp() throws Exception { MockitoAnnotations.openMocks(this); + SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(); uut = new LongRunningMessageHandlerFactory(messageHandlingRunnableFactory, - timeoutExtenderFactory, queueFactory); + timeoutExtenderFactory, queueFactory, simpleMessageListenerContainer); } @Test - void testNullContracts() { + public void testNullContracts() throws Exception { // given NullPointerTester nullPointerTester = new NullPointerTester(); nullPointerTester.setDefault(QueueName.class, new QueueName("name")); nullPointerTester.setDefault(VisibilityTimeoutExtenderFactory.class, timeoutExtenderFactory); nullPointerTester.setDefault(QueueFactory.class, queueFactory); - nullPointerTester.setDefault(Integer.class, 10); // when nullPointerTester.testInstanceMethods(uut, NullPointerTester.Visibility.PACKAGE); @@ -48,14 +50,16 @@ void testNullContracts() { } @Test - void testConstructor_extractsTheCorrectMessageBatchSize() { + public void testConstructor_extractsTheCorrectMessageBatchSize() { // given - int expectedBatchSize = 8; + SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(); + simpleMessageListenerContainer.setMaxNumberOfMessages(2); // when - uut.setMaxConcurrentMessages(8); + uut = new LongRunningMessageHandlerFactory(messageHandlingRunnableFactory, + timeoutExtenderFactory, queueFactory, simpleMessageListenerContainer); // then - assertThat(uut.getMaxConcurrentMessages()).isEqualTo(expectedBatchSize); + assertEquals(2, uut.maxNumberOfMessagesPerBatch); } } \ No newline at end of file diff --git a/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerIntegrationTest.java b/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerIntegrationTest.java index 3ac0fe68..00d31307 100644 --- a/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerIntegrationTest.java +++ b/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerIntegrationTest.java @@ -3,20 +3,21 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import com.amazonaws.services.sqs.AmazonSQS; import com.mercateo.sqs.utils.queue.Queue; import com.mercateo.sqs.utils.queue.QueueName; import com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtenderFactory; import java.time.Duration; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -27,37 +28,35 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.mockito.Spy; -import org.springframework.core.task.TaskRejectedException; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.GenericMessage; -import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.model.QueueAttributeName; public class LongRunningMessageHandlerIntegrationTest { - private final MessageHandlingRunnableFactory messageHandlingRunnableFactory = new MessageHandlingRunnableFactory(); + private MessageHandlingRunnableFactory messageHandlingRunnableFactory = new MessageHandlingRunnableFactory(); @Mock - private SqsAsyncClient sqsClient; + private AmazonSQS sqsClient; - private final MessageWorkerWithHeaders worker = new TestWorkerWithHeaders(); + private MessageWorkerWithHeaders worker = new TestWorkerWithHeaders(); @Mock private FinishedMessageCallback finishedMessageCallback; @Spy private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); - - @Spy + + @Mock private ErrorHandlingStrategy errorHandlingStrategy; private LongRunningMessageHandler uut; @BeforeEach - void setUp() { + public void setUp() throws Exception { MockitoAnnotations.openMocks(this); - Map attributes = new HashMap<>(); - attributes.put(QueueAttributeName.VISIBILITY_TIMEOUT, "10"); + Map attributes = new HashMap<>(); + attributes.put("VisibilityTimeout", "10"); Queue queue = new Queue(new QueueName("queueName"), "queueUrl", attributes); VisibilityTimeoutExtenderFactory timeoutExtenderFactory = new VisibilityTimeoutExtenderFactory( sqsClient); @@ -68,102 +67,92 @@ void setUp() { } @Test - void testHandleMessage_processesOneMessageAndReturns() { + public void testHandleMessage_processesOneMessageAndReturns() { // given - MessageWrapper message = createMessage(1); + Message message = createMessage(1); // when - Thread thread = new Thread(() -> uut.handleMessage(message.getMessage())); + Thread thread = new Thread(() -> uut.handleMessage(message)); thread.start(); // then await().until(() -> !thread.isAlive()); - await().until(() -> message.getMessage().getPayload().isRunning()); - assertThat(uut.getMessagesInProcessing().getBackingSet()).containsOnly(message.getMessageId()); + await().until(() -> message.getPayload().isRunning()); + assertThat(uut.getMessagesInProcessing().getBackingSet()).containsOnly("messageId1"); } @Test - void testHandleMessage_processesTwoMessagesAndBlocks() { + public void testHandleMessage_processesTwoMessagesAndBlocks() { // given - MessageWrapper message1 = createMessage(1); - MessageWrapper message2 = createMessage(2); - List messageIds = List.of(String.valueOf(message1.getMessageId()), - String.valueOf(message2.getMessageId())); + Message message1 = createMessage(1); + Message message2 = createMessage(2); - Thread thread1 = new Thread(() -> uut.handleMessage(message1.getMessage())); + Thread thread1 = new Thread(() -> uut.handleMessage(message1)); thread1.start(); await().until(() -> !thread1.isAlive()); // when - Thread thread2 = new Thread(() -> uut.handleMessage(message2.getMessage())); + Thread thread2 = new Thread(() -> uut.handleMessage(message2)); thread2.start(); // then await().until(() -> Thread.State.WAITING == thread2.getState()); - await().until(() -> message1.getMessage().getPayload().isRunning()); - await().until(() -> message2.getMessage().getPayload().isRunning()); - assertThat(uut.getMessagesInProcessing().getBackingSet()).containsExactlyInAnyOrderElementsOf(messageIds); + await().until(() -> message1.getPayload().isRunning()); + await().until(() -> message2.getPayload().isRunning()); + assertThat(uut.getMessagesInProcessing().getBackingSet()).containsOnly("messageId1", + "messageId2"); } @Test - void testHandleMessage_processesFourMessagesAndFillsQueue() { + public void testHandleMessage_processesFourMessagesAndFillsQueue() { // given - MessageWrapper message1 = createMessage(1); - MessageWrapper message2 = createMessage(2); - MessageWrapper message3 = createMessage(3); - MessageWrapper message4 = createMessage(4); - List messageIds = List.of(String.valueOf(message1.getMessageId()), - String.valueOf(message2.getMessageId()), - String.valueOf(message3.getMessageId()), - String.valueOf(message4.getMessageId())); - - new Thread(() -> uut.handleMessage(message1.getMessage())).start(); - new Thread(() -> uut.handleMessage(message2.getMessage())).start(); - await().until(() -> message1.getMessage().getPayload().isRunning()); - await().until(() -> message2.getMessage().getPayload().isRunning()); + Message message1 = createMessage(1); + Message message2 = createMessage(2); + Message message3 = createMessage(3); + Message message4 = createMessage(4); + + new Thread(() -> uut.handleMessage(message1)).start(); + new Thread(() -> uut.handleMessage(message2)).start(); + await().until(() -> message1.getPayload().isRunning()); + await().until(() -> message2.getPayload().isRunning()); // when - Thread thread3 = new Thread(() -> uut.handleMessage(message3.getMessage())); + Thread thread3 = new Thread(() -> uut.handleMessage(message3)); thread3.start(); - Thread thread4 = new Thread(() -> uut.handleMessage(message4.getMessage())); + Thread thread4 = new Thread(() -> uut.handleMessage(message4)); thread4.start(); // then await().until(() -> Thread.State.WAITING == thread3.getState()); await().until(() -> Thread.State.WAITING == thread4.getState()); - assertThat(message3.getMessage().getPayload().isRunning()).isFalse(); - assertThat(message4.getMessage().getPayload().isRunning()).isFalse(); - assertThat(uut.getMessagesInProcessing().getBackingSet()).containsExactlyInAnyOrderElementsOf(messageIds); + assertFalse(message3.getPayload().isRunning()); + assertFalse(message4.getPayload().isRunning()); + assertThat(uut.getMessagesInProcessing().getBackingSet()).containsOnly("messageId1", + "messageId2", "messageId3", "messageId4"); verify(scheduledExecutorService, times(4)).scheduleAtFixedRate(any(), anyLong(), anyLong(), any()); } @Test - void testHandleMessage_processesSixMessageAndCrashes() { + public void testHandleMessage_processesSixMessageAndCrashes() { // given - MessageWrapper message1 = createMessage(1); - MessageWrapper message2 = createMessage(2); - MessageWrapper message3 = createMessage(3); - MessageWrapper message4 = createMessage(4); - MessageWrapper message5 = createMessage(5); - MessageWrapper message6 = createMessage(6); - - List messageIds = List.of(message1.getMessageId(), - message2.getMessageId(), - message3.getMessageId(), - message4.getMessageId(), - message5.getMessageId()); - - new Thread(() -> uut.handleMessage(message1.getMessage())).start(); - new Thread(() -> uut.handleMessage(message2.getMessage())).start(); - await().until(() -> message1.getMessage().getPayload().isRunning()); - await().until(() -> message2.getMessage().getPayload().isRunning()); - - Thread thread3 = new Thread(() -> uut.handleMessage(message3.getMessage())); + Message message1 = createMessage(1); + Message message2 = createMessage(2); + Message message3 = createMessage(3); + Message message4 = createMessage(4); + Message message5 = createMessage(5); + Message message6 = createMessage(6); + + new Thread(() -> uut.handleMessage(message1)).start(); + new Thread(() -> uut.handleMessage(message2)).start(); + await().until(() -> message1.getPayload().isRunning()); + await().until(() -> message2.getPayload().isRunning()); + + Thread thread3 = new Thread(() -> uut.handleMessage(message3)); thread3.start(); - Thread thread4 = new Thread(() -> uut.handleMessage(message4.getMessage())); + Thread thread4 = new Thread(() -> uut.handleMessage(message4)); thread4.start(); - Thread thread5 = new Thread(() -> uut.handleMessage(message5.getMessage())); + Thread thread5 = new Thread(() -> uut.handleMessage(message5)); thread5.start(); await().until(() -> Thread.State.WAITING == thread3.getState()); @@ -171,79 +160,99 @@ void testHandleMessage_processesSixMessageAndCrashes() { await().until(() -> Thread.State.WAITING == thread5.getState()); // when - assertThatThrownBy(() -> uut.handleMessage(message6.getMessage())) - .hasCauseInstanceOf(TaskRejectedException.class); + assertThatThrownBy(() -> uut.handleMessage(message6)); + + // then + assertThat(uut.getMessagesInProcessing().getBackingSet()).containsOnly("messageId1", + "messageId2", "messageId3", "messageId4", "messageId5"); + } + + @Test + public void testHandleMessage_performsDeduplication() { + // given + Message message1_1 = createMessage(1); + Message message1_2 = createMessage(1); + + Thread thread1 = new Thread(() -> uut.handleMessage(message1_1)); + thread1.start(); + await().until(() -> !thread1.isAlive()); + + // when + Thread thread2 = new Thread(() -> uut.handleMessage(message1_2)); + thread2.start(); // then - assertThat(uut.getMessagesInProcessing().getBackingSet()).containsExactlyInAnyOrderElementsOf(messageIds); + await().until(() -> !thread2.isAlive()); + assertTrue(message1_1.getPayload().isRunning()); + assertFalse(message1_2.getPayload().isRunning()); + assertThat(uut.getMessagesInProcessing().getBackingSet()).containsOnly("messageId1"); + verify(scheduledExecutorService).scheduleAtFixedRate(any(), anyLong(), anyLong(), any()); } @Test - void testHandleMessage_startsQueuedProcess() { + public void testHandleMessage_startsQueuedProcess() { // given - MessageWrapper message1 = createMessage(1); - MessageWrapper message2 = createMessage(2); - MessageWrapper message3 = createMessage(3); - List messageIds = List.of(String.valueOf(message1.getMessageId()), - String.valueOf(message3.getMessageId())); - - new Thread(() -> uut.handleMessage(message1.getMessage())).start(); - await().until(() -> message1.getMessage().getPayload().isRunning()); - Thread thread2 = new Thread(() -> uut.handleMessage(message2.getMessage())); + Message message1 = createMessage(1); + Message message2 = createMessage(2); + Message message3 = createMessage(3); + + new Thread(() -> uut.handleMessage(message1)).start(); + await().until(() -> message1.getPayload().isRunning()); + Thread thread2 = new Thread(() -> uut.handleMessage(message2)); thread2.start(); - await().until(() -> message2.getMessage().getPayload().isRunning()); - Thread thread3 = new Thread(() -> uut.handleMessage(message3.getMessage())); + await().until(() -> message2.getPayload().isRunning()); + Thread thread3 = new Thread(() -> uut.handleMessage(message3)); thread3.start(); // when - message2.getMessage().getPayload().stop(); + message2.getPayload().stop(); // then await().until(() -> Thread.State.WAITING == thread2.getState()); await().until(() -> Thread.State.WAITING == thread3.getState()); - assertThat(message3.getMessage().getPayload().isRunning()).isTrue(); - assertThat(uut.getMessagesInProcessing().getBackingSet()).containsExactlyInAnyOrderElementsOf(messageIds); + assertTrue(message3.getPayload().isRunning()); + assertThat(uut.getMessagesInProcessing().getBackingSet()).containsOnly("messageId1", + "messageId3"); } @Test - void testHandleMessage_resumesWaitingThreads() { + public void testHandleMessage_resumesWaitingThreads() { // given - MessageWrapper message1 = createMessage(1); - MessageWrapper message2 = createMessage(2); - List messageId = List.of(String.valueOf(message2.getMessageId())); + Message message1 = createMessage(1); + Message message2 = createMessage(2); - Thread thread1 = new Thread(() -> uut.handleMessage(message1.getMessage())); + Thread thread1 = new Thread(() -> uut.handleMessage(message1)); thread1.start(); await().until(() -> !thread1.isAlive()); - Thread thread2 = new Thread(() -> uut.handleMessage(message2.getMessage())); + Thread thread2 = new Thread(() -> uut.handleMessage(message2)); thread2.start(); await().until(() -> Thread.State.WAITING == thread2.getState()); // when - message1.getMessage().getPayload().stop(); + message1.getPayload().stop(); // then await().until(() -> !thread2.isAlive()); - assertThat(uut.getMessagesInProcessing().getBackingSet()).containsExactlyElementsOf(messageId); + assertThat(uut.getMessagesInProcessing().getBackingSet()).containsOnly("messageId2"); } - private MessageWrapper createMessage(int number) { + private Message createMessage(int number) { Map headers = new HashMap<>(); - headers.put("id", UUID.fromString("bf308aa2-bf48-49b8-a839-61611c71043" + number).toString()); + headers.put("MessageId", "messageId" + number); headers.put("ReceiptHandle", "receiptHandle" + number); MessageHeaders messageHeaders = new MessageHeaders(headers); - return new MessageWrapper<>(new GenericMessage<>(new InputObject(), messageHeaders)); + return new GenericMessage<>(new InputObject(), messageHeaders); } private class TestWorkerWithHeaders implements MessageWorkerWithHeaders { @Override - public String work(InputObject object, MessageHeaders messageHeaders) { + public String work(InputObject object, MessageHeaders messageHeaders) throws Exception { object.start(); - await().until(object::isFinished); + await().until(() -> object.isFinished()); object.stop(); return "done"; diff --git a/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerTest.java b/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerTest.java index e04c60bb..5d7299fa 100644 --- a/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerTest.java +++ b/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerTest.java @@ -16,13 +16,13 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.GenericMessage; @@ -52,7 +52,7 @@ public class LongRunningMessageHandlerTest { private LongRunningMessageHandler uut; @BeforeEach - void setUp() { + public void setUp() throws Exception { MockitoAnnotations.openMocks(this); when(queue.getName()).thenReturn(new QueueName("queuename")); when(queue.getDefaultVisibilityTimeout()).thenReturn(Duration.ofSeconds(120)); @@ -62,20 +62,19 @@ void setUp() { } @Test - void testNullContracts() { + public void testNullContracts() throws Exception { // given NullPointerTester nullPointerTester = new NullPointerTester(); nullPointerTester.setDefault(VisibilityTimeoutExtenderFactory.class, timeoutExtenderFactory); nullPointerTester.setDefault(Queue.class, queue); - + // when nullPointerTester.testInstanceMethods(uut, NullPointerTester.Visibility.PACKAGE); nullPointerTester.testConstructors(uut.getClass(), Visibility.PACKAGE); } - @Test - void timeUntilTimeOutExtensionTooLarge(){ + public void timeUntilTimeOutExtensionTooLarge() throws Exception { // when Throwable result = catchThrowable(() -> uut = new LongRunningMessageHandler<>(timeoutExtensionExecutor, 10, 2, @@ -86,8 +85,7 @@ void timeUntilTimeOutExtensionTooLarge(){ assertThat(result).isInstanceOf(IllegalStateException.class); } - @Test - void timeUntilTimeOutNegative(){ + public void timeUntilTimeOutNegative() throws Exception { // when Throwable result = catchThrowable(() -> uut = new LongRunningMessageHandler<>(timeoutExtensionExecutor, 10, 2, @@ -99,26 +97,25 @@ void timeUntilTimeOutNegative(){ } @Test - void testHandleMessage_handlesExceptionDuringTimeoutExtension() { + public void testHandleMessage_handlesExceptionDuringTimeoutExtension() { // given - MessageWrapper message = createMessage(); + Message message = createMessage(); RuntimeException exception = new RuntimeException("test exception"); when(timeoutExtensionExecutor.scheduleAtFixedRate(any(), anyLong(), anyLong(), any())) .thenThrow(exception); // when - assertThatThrownBy(() -> uut.handleMessage(message.getMessage())).hasCause(exception); + assertThatThrownBy(() -> uut.handleMessage(message)).hasCause(exception); // then assertThat(uut.getMessagesInProcessing().getBackingSet()).isEmpty(); } - private MessageWrapper createMessage() { + private Message createMessage() { Map headers = new HashMap<>(); - String messageId = UUID.fromString("bf308aa2-bf48-49b8-a839-61611c710430").toString(); - headers.put("id", messageId); + headers.put("MessageId", "messageId"); MessageHeaders messageHeaders = new MessageHeaders(headers); - return new MessageWrapper<>(new GenericMessage<>(1, messageHeaders)); + return new GenericMessage<>(1, messageHeaders); } } \ No newline at end of file diff --git a/src/test/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableTest.java b/src/test/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableTest.java index 4cda660d..a2d0a9b2 100644 --- a/src/test/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableTest.java +++ b/src/test/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableTest.java @@ -8,19 +8,17 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import com.google.common.testing.NullPointerTester; +import io.awspring.cloud.messaging.listener.Acknowledgment; import java.util.HashMap; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; -import io.awspring.cloud.sqs.MessagingHeaders; -import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementCallback; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.GenericMessage; @@ -31,9 +29,9 @@ class MessageHandlingRunnableTest { private MessageWorkerWithHeaders worker; @Mock - private AcknowledgementCallback acknowledgment; + private Acknowledgment acknowledgment; - private MessageWrapper message; + private Message message; @Mock private FinishedMessageCallback finishedMessageCallback; @@ -41,8 +39,6 @@ class MessageHandlingRunnableTest { @Mock private SetWithUpperBound messages; - private UUID messageGeneratedUUID; - @Mock private ScheduledFuture visibilityTimeoutExtender; @@ -52,52 +48,38 @@ class MessageHandlingRunnableTest { private MessageHandlingRunnable uut; @BeforeEach - public void setUp() { + public void setUp() throws Exception { MockitoAnnotations.openMocks(this); HashMap headerMap = new HashMap<>(); - headerMap.put("id", "bf308aa2-bf48-49b8-a839-61611c710431"); - headerMap.put(MessagingHeaders.ACKNOWLEDGMENT_CALLBACK_HEADER, acknowledgment); - message = new MessageWrapper<>(new GenericMessage<>(3, new MessageHeaders(headerMap))); - messageGeneratedUUID = message.getMessage().getHeaders().getId(); - uut = new MessageHandlingRunnable<>(worker, message, finishedMessageCallback, messages, + headerMap.put("MessageId", "mid"); + headerMap.put("Acknowledgment", acknowledgment); + message = new GenericMessage<>(3, new MessageHeaders(headerMap)); + uut = new MessageHandlingRunnable<>(worker, new MessageWrapper<>(message), finishedMessageCallback, messages, visibilityTimeoutExtender, errorHandlingStrategy); } - @Test - void testNullContracts() { - // given - NullPointerTester nullPointerTester = new NullPointerTester(); - nullPointerTester.setDefault(MessageWrapper.class, message); - nullPointerTester.setDefault(SetWithUpperBound.class, messages); - - // when - nullPointerTester.testInstanceMethods(uut, NullPointerTester.Visibility.PACKAGE); - nullPointerTester.testAllPublicConstructors(uut.getClass()); - } - @SuppressWarnings("unchecked") @Test void testRun() throws Throwable { // given - when(worker.work(3, message.getMessage().getHeaders())).thenReturn("3S"); - when(acknowledgment.onAcknowledge(message.getMessage())) - .thenReturn(mock(CompletableFuture.class)); + when(worker.work(3, message.getHeaders())).thenReturn("3S"); + when(acknowledgment.acknowledge()).thenReturn(mock(Future.class)); // when uut.run(); // then verify(finishedMessageCallback).call(3, "3S"); - verify(acknowledgment).onAcknowledge(message.getMessage()); + verify(acknowledgment).acknowledge(); verify(visibilityTimeoutExtender).cancel(false); - verify(messages).remove(messageGeneratedUUID.toString()); + verify(messages).remove("mid"); } @Test void testRun_throws_workerException_and_does_not_ack() throws Throwable { // given Exception e = new IllegalArgumentException(); - doThrow(e).when(worker).work(3, message.getMessage().getHeaders()); + doThrow(e).when(worker).work(3, message.getHeaders()); doThrow(e).when(errorHandlingStrategy).handleWorkerException(e, message); // when @@ -109,7 +91,7 @@ void testRun_throws_workerException_and_does_not_ack() throws Throwable { assertThat(result).isEqualTo(e); verify(errorHandlingStrategy).handleWorkerException(e, message); verify(visibilityTimeoutExtender).cancel(false); - verify(messages).remove(messageGeneratedUUID.toString()); + verify(messages).remove("mid"); } @SuppressWarnings("unchecked") @@ -117,17 +99,16 @@ void testRun_throws_workerException_and_does_not_ack() throws Throwable { void testRun_throws_workerException_and_acks() throws Throwable { // given Exception e = new IllegalArgumentException(); - doThrow(e).when(worker).work(3, message.getMessage().getHeaders()); - when(acknowledgment.onAcknowledge(message.getMessage())) - .thenReturn(mock(CompletableFuture.class)); + doThrow(e).when(worker).work(3, message.getHeaders()); + when(acknowledgment.acknowledge()).thenReturn(mock(Future.class)); // when uut.run(); // then verify(errorHandlingStrategy).handleWorkerException(e, message); - verify(acknowledgment).onAcknowledge(message.getMessage()); + verify(acknowledgment).acknowledge(); verify(visibilityTimeoutExtender).cancel(false); - verify(messages).remove(messageGeneratedUUID.toString()); + verify(messages).remove("mid"); } } \ No newline at end of file diff --git a/src/test/java/com/mercateo/sqs/utils/message/handling/MessageWorkerTest.java b/src/test/java/com/mercateo/sqs/utils/message/handling/MessageWorkerTest.java index 69f9b622..057c9704 100644 --- a/src/test/java/com/mercateo/sqs/utils/message/handling/MessageWorkerTest.java +++ b/src/test/java/com/mercateo/sqs/utils/message/handling/MessageWorkerTest.java @@ -1,6 +1,6 @@ package com.mercateo.sqs.utils.message.handling; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verifyNoInteractions; @@ -12,7 +12,7 @@ public class MessageWorkerTest { @Test - void testWorkDelegatesMethodCall() throws Exception { + public void testWorkDelegatesMethodCall() throws Exception { // given AtomicInteger counter = new AtomicInteger(0); MessageWorker uut = new MessageWorker() { @@ -27,7 +27,7 @@ public Integer work(String object) { uut.work("dummy value", messageHeaders); // then - assertThat(counter.intValue()).isEqualTo(counter.intValue()); + assertEquals(1, counter.intValue()); verifyNoInteractions(messageHeaders); } } \ No newline at end of file diff --git a/src/test/java/com/mercateo/sqs/utils/message/handling/SetWithUpperBoundTest.java b/src/test/java/com/mercateo/sqs/utils/message/handling/SetWithUpperBoundTest.java index 80592e59..884ec54d 100644 --- a/src/test/java/com/mercateo/sqs/utils/message/handling/SetWithUpperBoundTest.java +++ b/src/test/java/com/mercateo/sqs/utils/message/handling/SetWithUpperBoundTest.java @@ -1,6 +1,7 @@ package com.mercateo.sqs.utils.message.handling; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import com.google.common.testing.NullPointerTester; @@ -13,7 +14,7 @@ public class SetWithUpperBoundTest { @Test - void testNullContracts(){ + public void testNullContracts() throws Exception { // given SetWithUpperBound uut = new SetWithUpperBound<>(4); NullPointerTester nullPointerTester = new NullPointerTester(); @@ -24,7 +25,7 @@ void testNullContracts(){ } @Test - void testContains_contains() { + public void testContains_contains() { // given SetWithUpperBound setWithUpperBound = new SetWithUpperBound<>(2); setWithUpperBound.add("1"); @@ -34,11 +35,11 @@ void testContains_contains() { boolean contains = setWithUpperBound.contains("2"); // then - assertThat(contains).isTrue(); + assertTrue(contains); } @Test - void testContains_doesNotContain() { + public void testContains_doesNotContain() { // given SetWithUpperBound setWithUpperBound = new SetWithUpperBound<>(2); setWithUpperBound.add("1"); @@ -48,11 +49,11 @@ void testContains_doesNotContain() { boolean contains = setWithUpperBound.contains("3"); // then - assertThat(contains).isFalse(); + assertFalse(contains); } @Test - void testRemove() { + public void testRemove() { // given SetWithUpperBound setWithUpperBound = new SetWithUpperBound<>(2); setWithUpperBound.add("1"); @@ -62,11 +63,11 @@ void testRemove() { setWithUpperBound.remove("2"); // then - assertThat(setWithUpperBound.contains("2")).isFalse(); + assertFalse(setWithUpperBound.contains("2")); } @Test - void testWaitUntilAtLeastOneFree_notifyAndWaitWorking() throws InterruptedException { + public void testWaitUntilAtLeastOneFree_notifyAndWaitWorking() throws InterruptedException { // given CountDownLatch waitingThreads = new CountDownLatch(2); CountDownLatch waitingThreadsToBeStarted = new CountDownLatch(2); @@ -97,6 +98,6 @@ void testWaitUntilAtLeastOneFree_notifyAndWaitWorking() throws InterruptedExcept }).start(); // then - assertThat(waitingThreads.await(100, TimeUnit.MILLISECONDS)).isTrue(); + assertTrue(waitingThreads.await(100, TimeUnit.MILLISECONDS)); } } \ No newline at end of file diff --git a/src/test/java/com/mercateo/sqs/utils/queue/QueueFactoryTest.java b/src/test/java/com/mercateo/sqs/utils/queue/QueueFactoryTest.java index 12c6d98a..ff8c67d2 100644 --- a/src/test/java/com/mercateo/sqs/utils/queue/QueueFactoryTest.java +++ b/src/test/java/com/mercateo/sqs/utils/queue/QueueFactoryTest.java @@ -1,37 +1,39 @@ package com.mercateo.sqs.utils.queue; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.GetQueueAttributesRequest; +import com.amazonaws.services.sqs.model.GetQueueAttributesResult; +import com.amazonaws.services.sqs.model.GetQueueUrlRequest; +import com.amazonaws.services.sqs.model.GetQueueUrlResult; import com.google.common.testing.NullPointerTester; import java.util.HashMap; -import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.model.*; public class QueueFactoryTest { @Mock - private SqsAsyncClient amazonSQS; + private AmazonSQS amazonSQS; private QueueFactory uut; @BeforeEach - void setUp() throws Exception { + public void setUp() throws Exception { MockitoAnnotations.openMocks(this); uut = new QueueFactory(amazonSQS); } @Test - void testNullContracts() throws Exception { + public void testNullContracts() throws Exception { // given NullPointerTester nullPointerTester = new NullPointerTester(); @@ -41,32 +43,27 @@ void testNullContracts() throws Exception { } @Test - void testGet() { + public void testGet() { // given QueueName qn = new QueueName("q1"); - GetQueueUrlResponse queueUrlResult = mock(GetQueueUrlResponse.class); - when(queueUrlResult.queueUrl()).thenReturn("url1"); - CompletableFuture mockGetQueueUrlResult = new CompletableFuture<>(); - mockGetQueueUrlResult.complete(queueUrlResult); - - GetQueueAttributesResponse attributesResult = mock(GetQueueAttributesResponse.class); - HashMap attributes = new HashMap<>(); - attributes.put(QueueAttributeName.fromValue("1"), "3"); - attributes.put(QueueAttributeName.fromValue("hi"), "ho"); - CompletableFuture mockGetQueueAttributesResult = new CompletableFuture<>(); - mockGetQueueAttributesResult.complete(attributesResult); - - when(attributesResult.attributes()).thenReturn(attributes); - when(amazonSQS.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(mockGetQueueUrlResult); + GetQueueUrlResult queueUrlResult = mock(GetQueueUrlResult.class); + when(queueUrlResult.getQueueUrl()).thenReturn("url1"); + GetQueueAttributesResult attributesResult = mock(GetQueueAttributesResult.class); + HashMap attributes = new HashMap<>(); + attributes.put("1", "3"); + attributes.put("hi", "ho"); + when(attributesResult.getAttributes()).thenReturn(attributes); + when(amazonSQS.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(queueUrlResult); when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn( - mockGetQueueAttributesResult); + attributesResult); // when Queue queue = uut.get(qn); // then - assertThat(queue.getUrl()).isEqualTo("url1"); - assertThat(queue.getName().getId()).isEqualTo("q1"); - assertThat(queue.getQueueAttributes()).isEqualTo(attributes); + assertEquals("url1", queue.getUrl()); + assertEquals("q1", queue.getName().getId()); + assertEquals(attributes, queue.getQueueAttributes()); + } } \ No newline at end of file diff --git a/src/test/java/com/mercateo/sqs/utils/queue/QueueTest.java b/src/test/java/com/mercateo/sqs/utils/queue/QueueTest.java index b1aee550..bb6c8a78 100644 --- a/src/test/java/com/mercateo/sqs/utils/queue/QueueTest.java +++ b/src/test/java/com/mercateo/sqs/utils/queue/QueueTest.java @@ -1,6 +1,6 @@ package com.mercateo.sqs.utils.queue; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; import com.google.common.testing.NullPointerTester; @@ -12,23 +12,22 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import software.amazon.awssdk.services.sqs.model.QueueAttributeName; public class QueueTest { @Mock - private Map queueAttributes; + private Map queueAttributes; private Queue uut; @BeforeEach - void setUp() { + public void setUp() throws Exception { MockitoAnnotations.openMocks(this); uut = new Queue(new QueueName("123"), "http://url.de", queueAttributes); } @Test - void testNullContracts() throws Exception { + public void testNullContracts() throws Exception { // given NullPointerTester nullPointerTester = new NullPointerTester(); nullPointerTester.ignore(uut.getClass().getDeclaredMethod("canEqual", Object.class)); @@ -41,14 +40,14 @@ void testNullContracts() throws Exception { } @Test - void testGetDefaultVisibilityTimeout() { + public void testGetDefaultVisibilityTimeout() { // given - Mockito.when(queueAttributes.get(QueueAttributeName.VISIBILITY_TIMEOUT)).thenReturn("734"); + Mockito.when(queueAttributes.get("VisibilityTimeout")).thenReturn("734"); // when Duration defaultVisibilityTimeout = uut.getDefaultVisibilityTimeout(); // then - assertThat(defaultVisibilityTimeout.getSeconds()).isEqualTo(734); + assertEquals(734, defaultVisibilityTimeout.getSeconds()); } } \ No newline at end of file diff --git a/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderTest.java b/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderTest.java index e82cf430..e56eb973 100644 --- a/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderTest.java +++ b/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderTest.java @@ -7,6 +7,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.amazonaws.SdkClientException; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest; import com.github.rholder.retry.RetryException; import com.github.rholder.retry.StopStrategies; import com.github.rholder.retry.WaitStrategies; @@ -17,7 +20,6 @@ import java.net.UnknownHostException; import java.time.Duration; import java.util.HashMap; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; @@ -27,36 +29,32 @@ import org.mockito.MockitoAnnotations; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.GenericMessage; -import software.amazon.awssdk.core.exception.SdkClientException; -import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; -import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse; class VisibilityTimeoutExtenderTest { private VisibilityTimeoutExtender uut; @Mock - private SqsAsyncClient sqsClient; + private AmazonSQS sqsClient; @Mock private ErrorHandlingStrategy errorHandlingStrategy; @BeforeEach - void setUp() { + public void setUp() throws Exception { MockitoAnnotations.openMocks(this); HashMap headerMap = new HashMap<>(); headerMap.put("ReceiptHandle", "rhd"); - MessageWrapper message = new MessageWrapper<>(new GenericMessage<>(new Object(), new MessageHeaders( - headerMap))); + GenericMessage message = new GenericMessage<>(new Object(), new MessageHeaders(headerMap)); + MessageWrapper messageWrapper = new MessageWrapper<>(message); RetryStrategy retryStrategy = new RetryStrategy(WaitStrategies.fixedWait(1, TimeUnit.MICROSECONDS), StopStrategies.stopAfterAttempt(5)); - uut = new VisibilityTimeoutExtender(sqsClient, Duration.ofSeconds(10*60), message, "queue", + uut = new VisibilityTimeoutExtender(sqsClient, Duration.ofMinutes(10), messageWrapper, "queue", errorHandlingStrategy, retryStrategy); } @Test - void testNullContracts() { + void testNullContracts() throws Exception { // given NullPointerTester nullPointerTester = new NullPointerTester(); @@ -68,9 +66,6 @@ void testNullContracts() { @Test void testRun() { // given - CompletableFuture future = new CompletableFuture<>(); - future.complete(ChangeMessageVisibilityResponse.builder().build()); - when(sqsClient.changeMessageVisibility(any(ChangeMessageVisibilityRequest.class))).thenReturn(future); // when uut.run(); @@ -81,20 +76,19 @@ void testRun() { verify(sqsClient).changeMessageVisibility(captor.capture()); ChangeMessageVisibilityRequest request = captor.getValue(); - assertThat(request.receiptHandle()).isEqualTo("rhd"); - assertThat(request.queueUrl()).isEqualTo("queue"); - assertThat(request.visibilityTimeout().intValue()).isEqualTo(600); + assertThat(request.getReceiptHandle()).isEqualTo("rhd"); + assertThat(request.getQueueUrl()).isEqualTo("queue"); + assertThat(request.getVisibilityTimeout().intValue()).isEqualTo(600); } @Test void retryForUnknownHostException() { - SdkClientException sdkClientException = - SdkClientException.builder().cause(new UnknownHostException()).build(); + SdkClientException sdkClientException = new SdkClientException("foo", new UnknownHostException()); // given - when(sqsClient.changeMessageVisibility(any(ChangeMessageVisibilityRequest.class))) + when(sqsClient.changeMessageVisibility(any())) .thenThrow(sdkClientException); // when Throwable result = catchThrowable(() -> uut.run()); @@ -102,22 +96,22 @@ void retryForUnknownHostException() { // then assertThat(result).isInstanceOf(RuntimeException.class); assertThat(result.getCause()).isInstanceOf(RetryException.class); - verify(sqsClient, times(5)).changeMessageVisibility(any(ChangeMessageVisibilityRequest.class)); + verify(sqsClient, times(5)).changeMessageVisibility(any()); } @Test void dontRetryForSdkClientExceptionsInGeneral() { - SdkClientException sdkClientException = SdkClientException.builder().build(); + SdkClientException sdkClientException = new SdkClientException("foo"); // given - when(sqsClient.changeMessageVisibility(any(ChangeMessageVisibilityRequest.class))).thenThrow(sdkClientException); + when(sqsClient.changeMessageVisibility(any())).thenThrow(sdkClientException); // when Throwable result = catchThrowable(() -> uut.run()); // then assertThat(result).isInstanceOf(RuntimeException.class); - verify(sqsClient, times(1)).changeMessageVisibility(any(ChangeMessageVisibilityRequest.class)); + verify(sqsClient, times(1)).changeMessageVisibility(any()); } } \ No newline at end of file