From 78d439c17c1b52ff524028815193f845712bffd5 Mon Sep 17 00:00:00 2001 From: "Stephan.Praetsch" Date: Wed, 17 Apr 2024 13:48:24 +0200 Subject: [PATCH] provide MDC context map to submitted task --- CHANGELOG.md | 4 ++++ pom.xml | 2 +- .../handling/LongRunningMessageHandler.java | 14 +++++++++++++- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 85fc8412..b7e5017e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.8.0 + +* pass MDC context to workers + # 0.7.1 * adds the possibility to configure a handling in case of a `Throwable` in `ErrorHandlingStrategy` diff --git a/pom.xml b/pom.xml index 3fbf4a4b..6c4ad21f 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ sqs-utils com.mercateo.sqs sqs-utils - 0.7.5-SNAPSHOT + 0.8.0-SNAPSHOT jar Some utility classes for AWS SQS diff --git a/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.java b/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.java index e26dd194..bf41e82f 100644 --- a/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.java +++ b/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.java @@ -20,6 +20,8 @@ import com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtenderFactory; import java.time.Duration; +import java.util.Map; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -28,6 +30,7 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.slf4j.MDC; import org.springframework.messaging.Message; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -79,7 +82,16 @@ public class LongRunningMessageHandler { this.awaitShutDown = awaitShutDown; this.errorHandlingStrategy = errorHandlingStrategy; - messageProcessingExecutor = new ThreadPoolTaskExecutor(); + messageProcessingExecutor = new ThreadPoolTaskExecutor() { + @Override + public Future submit(Runnable task) { + Map current = MDC.getCopyOfContextMap(); + return super.submit(() -> { + MDC.setContextMap(current); + task.run(); + }); + } + }; messageProcessingExecutor.setCorePoolSize(numberOfThreads); messageProcessingExecutor.setMaxPoolSize(numberOfThreads); messageProcessingExecutor.setThreadNamePrefix(getClass().getSimpleName()+"-"+queue.getName().getId()+"-");