Skip to content
This repository has been archived by the owner on Sep 6, 2024. It is now read-only.

Revert "update to spring boot 3" #26

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/Build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: '17'
java-version: '8'
distribution: 'temurin'
cache: maven
- name: verify
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/Deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: '17'
java-version: '8'
distribution: 'temurin'
cache: maven
server-id: ossrh
Expand Down
6 changes: 0 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
# 1.0.0

* maximum amount of messages per batch must be set manually if custom value is preferred (default remains 10)
* compatible with Spring Boot 3.1.*
* use `S3AsyncClient` instead of synchronous `S3Client`

# 0.8.1

* pass MDC context to workers
Expand Down
60 changes: 21 additions & 39 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<artifactId>sqs-utils</artifactId>
<groupId>com.mercateo.sqs</groupId>
<name>sqs-utils</name>
<version>1.0.0-SNAPSHOT</version>
<version>0.9.1-SNAPSHOT</version>
<packaging>jar</packaging>

<description>Some utility classes for AWS SQS</description>
Expand Down Expand Up @@ -39,7 +39,7 @@
<connection>scm:git:[email protected]:Mercateo/sqs-utils.git</connection>
<developerConnection>scm:git:[email protected]:Mercateo/sqs-utils.git</developerConnection>
<url>https://github.com/Mercateo/sqs-utils.git</url>
<tag>HEAD</tag>
<tag>0.8.0</tag>
</scm>

<developers>
Expand All @@ -59,21 +59,13 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jackson.version>2.14.2</jackson.version>
<aws-sdk-sqs.version>2.27.17</aws-sdk-sqs.version>
<spring-cloud-aws.version>3.0.4</spring-cloud-aws.version>
<aws-sdk-sqs.version>1.12.406</aws-sdk-sqs.version>
<slf4j.version>2.0.6</slf4j.version>
<junit.version>5.9.2</junit.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>${aws-sdk-sqs.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson</groupId>
<artifactId>jackson-bom</artifactId>
Expand All @@ -93,42 +85,31 @@

<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
<version>${aws-sdk-sqs.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.34</version>
<version>1.18.26</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-dependencies</artifactId>
<version>${spring-cloud-aws.version}</version>
<type>pom</type>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-sqs</artifactId>
<version>${spring-cloud-aws.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>6.0.11</version>
</dependency>
<dependency>
<groupId>jakarta.inject</groupId>
<artifactId>jakarta.inject-api</artifactId>
<version>2.0.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-messaging</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
Expand Down Expand Up @@ -157,7 +138,8 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.12.0</version>
<!-- TODO: mockito 5 requires newer java version, do later -->
<version>4.11.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -193,11 +175,11 @@
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.13.0</version>
<version>3.6.1</version>
<configuration>
<compilerVersion>17</compilerVersion>
<source>17</source>
<target>17</target>
<compilerVersion>1.8</compilerVersion>
<source>1.8</source>
<target>1.8</target>
<debug>true</debug>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,46 @@
*/
package com.mercateo.sqs.utils.message.handling;

import java.util.Objects;
import com.amazonaws.AmazonServiceException;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import software.amazon.awssdk.awscore.exception.AwsServiceException;
import org.springframework.messaging.Message;

@Slf4j
class DefaultErrorHandlingStrategy<I> implements ErrorHandlingStrategy<I> {

@Override
@SneakyThrows
public void handleWorkerException(Exception e, MessageWrapper<I> message) {
log.error("error while handling message " + message.getMessageId() + ": " + message.getMessage().getPayload(), e);
public void handleWorkerException(Exception e, Message<I> message) {
String messageId = message.getHeaders().get("MessageId", String.class);
log.error("error while handling message " + messageId + ": " + message.getPayload(), e);
throw e;
}

@Override
@SneakyThrows
public void handleWorkerThrowable(Throwable t, MessageWrapper<I> message) {
String messageId = String.valueOf(message.getMessageId());
log.error("error while handling message " + messageId + ": " + message.getMessage().getPayload(), t);
public void handleWorkerThrowable(Throwable t, Message<I> message) {
String messageId = message.getHeaders().get("MessageId", String.class);
log.error("error while handling message " + messageId + ": " + message.getPayload(), t);
throw t;
}

@Override
public void handleExtendVisibilityTimeoutException(AwsServiceException e,
MessageWrapper<?> message) {
public void handleExtendVisibilityTimeoutException(AmazonServiceException e,
Message<?> message) {

String msg = "error while extending message visibility for " + Objects.requireNonNull(
message.getMessageId());
String msg = "error while extending message visibility for " + message.getHeaders().get("MessageId",
String.class);
log.error(msg, e);
throw e;

}

@Override
public void handleAcknowledgeMessageException(AwsServiceException e, MessageWrapper<I> message) {
String messageId = message.getMessageId();
public void handleAcknowledgeMessageException(AmazonServiceException e, Message<I> message) {
String messageId = message.getHeaders().get("MessageId", String.class);
log.error("could not acknowledge " + messageId, e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package com.mercateo.sqs.utils.message.handling;

import software.amazon.awssdk.awscore.exception.AwsServiceException;
import com.amazonaws.AmazonServiceException;

import org.springframework.messaging.Message;

public interface ErrorHandlingStrategy<I> {

Expand All @@ -28,7 +30,7 @@ public interface ErrorHandlingStrategy<I> {
* @param message
* that was incorrectly processed
*/
void handleWorkerException(Exception e, MessageWrapper<I> message);
void handleWorkerException(Exception e, Message<I> message);

/**
* Defines how a throwable, that is thrown by the worker are handled. If a
Expand All @@ -39,7 +41,7 @@ public interface ErrorHandlingStrategy<I> {
* @param message
* that was incorrectly processed
*/
void handleWorkerThrowable(Throwable t, MessageWrapper<I> message);
void handleWorkerThrowable(Throwable t, Message<I> message);

/**
* Defines how exceptions, that are thrown by the timeout extension are handled.
Expand All @@ -50,7 +52,7 @@ public interface ErrorHandlingStrategy<I> {
* @param message that was tried to extend
*/

void handleExtendVisibilityTimeoutException(AwsServiceException e, MessageWrapper<?> message);
void handleExtendVisibilityTimeoutException(AmazonServiceException e, Message<?> message);

/**
* Defines how exceptions, that are thrown by the message acknowledgement are handled.
Expand All @@ -61,6 +63,8 @@ public interface ErrorHandlingStrategy<I> {
* @param message that was tried to extend
*/

void handleAcknowledgeMessageException(AwsServiceException e, MessageWrapper<I> message);
void handleAcknowledgeMessageException(AmazonServiceException e, Message<I> message);



}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtenderFactory;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,6 +29,7 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import org.slf4j.MDC;
import org.springframework.messaging.Message;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

Expand Down Expand Up @@ -78,7 +81,24 @@ public class LongRunningMessageHandler<I, O> {
this.awaitShutDown = awaitShutDown;
this.errorHandlingStrategy = errorHandlingStrategy;

messageProcessingExecutor = new ThreadPoolTaskExecutor();
messageProcessingExecutor = new ThreadPoolTaskExecutor() {
@Override
public Future<?> submit(Runnable task) {
Map<String, String> current = MDC.getCopyOfContextMap();
return super.submit(() -> {
try {
if (current != null) {
current.forEach(MDC::put);
}
task.run();
} finally {
if (current != null) {
current.keySet().forEach(MDC::remove);
}
}
});
}
};
messageProcessingExecutor.setCorePoolSize(numberOfThreads);
messageProcessingExecutor.setMaxPoolSize(numberOfThreads);
messageProcessingExecutor.setThreadNamePrefix(getClass().getSimpleName()+"-"+queue.getName().getId()+"-");
Expand Down Expand Up @@ -131,7 +151,6 @@ public class LongRunningMessageHandler<I, O> {
public void handleMessage(@NonNull Message<I> message) {
MessageWrapper<I> messageWrapper = new MessageWrapper<>(message);
String messageId = messageWrapper.getMessageId();

if (messagesInProcessing.contains(messageId)) {
return;
}
Expand Down Expand Up @@ -173,17 +192,17 @@ public int getFreeWorkerCapacity() {
return messagesInProcessing.free();
}

private void scheduleNewMessageTask(@NonNull MessageWrapper<I> message,
private void scheduleNewMessageTask(@NonNull MessageWrapper<I> messageWrapper,
ScheduledFuture<?> visibilityTimeoutExtender) {
MessageHandlingRunnable<I, O> messageTask = messageHandlingRunnableFactory.get(worker,
message, finishedMessageCallback, messagesInProcessing, visibilityTimeoutExtender, errorHandlingStrategy);
messageWrapper, finishedMessageCallback, messagesInProcessing, visibilityTimeoutExtender, errorHandlingStrategy);

messageProcessingExecutor.submit(messageTask);
}

private ScheduledFuture<?> scheduleNewVisibilityTimeoutExtender(@NonNull MessageWrapper<I> message) {
private ScheduledFuture<?> scheduleNewVisibilityTimeoutExtender(@NonNull MessageWrapper<I> messageWrapper) {
return timeoutExtensionExecutor.scheduleAtFixedRate(
timeoutExtenderFactory.get(message, queue, errorHandlingStrategy),
timeoutExtenderFactory.get(messageWrapper, queue, errorHandlingStrategy),
timeUntilVisibilityTimeoutExtension.toMillis(),
timeUntilVisibilityTimeoutExtension.toMillis(),
TimeUnit.MILLISECONDS);
Expand Down
Loading
Loading