diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java index 79073c96c..e11a92bdb 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java @@ -226,12 +226,17 @@ private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) { protected BackPressureHandler createBackPressureHandler() { O containerOptions = getContainerOptions(); - return SemaphoreBackPressureHandler.builder().batchSize(containerOptions.getMaxMessagesPerPoll()) + BatchAwareBackPressureHandler backPressureHandler = SemaphoreBackPressureHandler.builder() + .batchSize(containerOptions.getMaxMessagesPerPoll()) .totalPermits(containerOptions.getMaxConcurrentMessages()) - .standbyLimitPollingInterval(containerOptions.getStandbyLimitPollingInterval()) .acquireTimeout(containerOptions.getMaxDelayBetweenPolls()) - .throughputConfiguration(containerOptions.getBackPressureMode()) - .backPressureLimiter(containerOptions.getBackPressureLimiter()).build(); + .throughputConfiguration(containerOptions.getBackPressureMode()).build(); + if (containerOptions.getBackPressureLimiter() != null) { + backPressureHandler = new BackPressureHandlerLimiter(backPressureHandler, + containerOptions.getBackPressureLimiter(), containerOptions.getStandbyLimitPollingInterval(), + containerOptions.getMaxDelayBetweenPolls()); + } + return backPressureHandler; } protected TaskExecutor createSourcesTaskExecutor() { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java new file mode 100644 index 000000000..aeb5a61cb --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java @@ -0,0 +1,153 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.listener; + +import java.time.Duration; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A {@link BatchAwareBackPressureHandler} implementation that uses an internal {@link Semaphore} for adapting the + * maximum number of permits that can be acquired by the {@link #backPressureHandler} based on the downstream + * backpressure limit computed by the {@link #backPressureLimiter}. + * + * @see BackPressureLimiter + */ +public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler { + + /** + * The {@link BatchAwareBackPressureHandler} which permits should be limited by the {@link #backPressureLimiter}. + */ + private final BatchAwareBackPressureHandler backPressureHandler; + + /** + * The {@link BackPressureLimiter} which computes a limit on how many permits can be requested at a given moment. + */ + private final BackPressureLimiter backPressureLimiter; + + /** + * The duration to wait for permits to be acquired. + */ + private final Duration acquireTimeout; + + /** + * The duration to sleep when the queue processing is in standby. + */ + private final Duration standbyLimitPollingInterval; + + /** + * The limit of permits that can be acquired at the current time. The permits limit is defined in the [0, + * Integer.MAX_VALUE] interval. A value of {@literal 0} means that no permits can be acquired. + *

+ * This value is updated based on the downstream backpressure reported by the {@link #backPressureLimiter}. + */ + private final AtomicInteger permitsLimit = new AtomicInteger(0); + + private final ReducibleSemaphore semaphore = new ReducibleSemaphore(0); + + public BackPressureHandlerLimiter(BatchAwareBackPressureHandler backPressureHandler, + BackPressureLimiter backPressureLimiter, Duration standbyLimitPollingInterval, Duration acquireTimeout) { + this.backPressureHandler = backPressureHandler; + this.backPressureLimiter = backPressureLimiter; + this.acquireTimeout = acquireTimeout; + this.standbyLimitPollingInterval = standbyLimitPollingInterval; + } + + @Override + public int requestBatch() throws InterruptedException { + int permits = updatePermitsLimit(); + int batchSize = getBatchSize(); + if (permits < batchSize) { + return acquirePermits(permits, backPressureHandler::request); + } + return acquirePermits(batchSize, p -> backPressureHandler.requestBatch()); + } + + @Override + public void releaseBatch() { + semaphore.release(getBatchSize()); + backPressureHandler.releaseBatch(); + } + + @Override + public int getBatchSize() { + return backPressureHandler.getBatchSize(); + } + + @Override + public int request(int amount) throws InterruptedException { + int permits = Math.min(updatePermitsLimit(), amount); + return acquirePermits(permits, backPressureHandler::request); + } + + @Override + public void release(int amount) { + semaphore.release(amount); + backPressureHandler.release(amount); + } + + @Override + public boolean drain(Duration timeout) { + return backPressureHandler.drain(timeout); + } + + private int updatePermitsLimit() { + return permitsLimit.updateAndGet(oldLimit -> { + int newLimit = Math.max(0, backPressureLimiter.limit()); + if (newLimit < oldLimit) { + int blockedPermits = oldLimit - newLimit; + semaphore.reducePermits(blockedPermits); + } + else if (newLimit > oldLimit) { + int releasedPermits = newLimit - oldLimit; + semaphore.release(releasedPermits); + } + return newLimit; + }); + } + + private interface PermitsRequester { + int request(int amount) throws InterruptedException; + } + + private int acquirePermits(int amount, PermitsRequester permitsRequester) throws InterruptedException { + if (amount == 0) { + Thread.sleep(standbyLimitPollingInterval.toMillis()); + return 0; + } + if (semaphore.tryAcquire(amount, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS)) { + int obtained = permitsRequester.request(amount); + if (obtained < amount) { + semaphore.release(amount - obtained); + } + return obtained; + } + return 0; + } + + private static class ReducibleSemaphore extends Semaphore { + + ReducibleSemaphore(int permits) { + super(permits); + } + + @Override + public void reducePermits(int reduction) { + super.reducePermits(reduction); + } + } +} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java index e3d069bce..310b64519 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java @@ -17,11 +17,9 @@ import java.time.Duration; import java.util.Arrays; -import java.util.Objects; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; @@ -37,63 +35,33 @@ public class SemaphoreBackPressureHandler implements BatchAwareBackPressureHandl private static final Logger logger = LoggerFactory.getLogger(SemaphoreBackPressureHandler.class); - private final BackPressureLimiter backPressureLimiter; - - private final ReducibleSemaphore semaphore; + private final Semaphore semaphore; private final int batchSize; - /** - * The theoretical maximum numbers of permits that can be acquired if no limit is set. - * @see #permitsLimit for the current limit. - */ private final int totalPermits; - /** - * The limit of permits that can be acquired at the current time. The permits limit is defined in the [0, - * totalPermits] interval. A value of {@literal 0} means that no permits can be acquired. - *

- * This value is updated based on the downstream backpressure reported by the {@link #backPressureLimiter}. - */ - private final AtomicInteger permitsLimit; - - /** - * The duration to sleep when the queue processing is in standby. - */ - private final Duration standbyLimitPollingInterval; - private final Duration acquireTimeout; private final BackPressureMode backPressureConfiguration; private volatile CurrentThroughputMode currentThroughputMode; - /** - * The number of permits acquired in low throughput mode. This value is minimum value between {@link #permitsLimit} - * at the time of the acquire and {@link #totalPermits}. - */ - private final AtomicInteger lowThroughputAcquiredPermits = new AtomicInteger(0); - private final AtomicBoolean hasAcquiredFullPermits = new AtomicBoolean(false); private String id; - private final AtomicBoolean isDraining = new AtomicBoolean(false); - private SemaphoreBackPressureHandler(Builder builder) { this.batchSize = builder.batchSize; this.totalPermits = builder.totalPermits; - this.standbyLimitPollingInterval = builder.standbyLimitPollingInterval; this.acquireTimeout = builder.acquireTimeout; this.backPressureConfiguration = builder.backPressureMode; - this.semaphore = new ReducibleSemaphore(totalPermits); + this.semaphore = new Semaphore(totalPermits); this.currentThroughputMode = BackPressureMode.FIXED_HIGH_THROUGHPUT.equals(backPressureConfiguration) ? CurrentThroughputMode.HIGH : CurrentThroughputMode.LOW; logger.debug("SemaphoreBackPressureHandler created with configuration {} and {} total permits", backPressureConfiguration, totalPermits); - this.permitsLimit = new AtomicInteger(totalPermits); - this.backPressureLimiter = Objects.requireNonNullElse(builder.backPressureLimiter, () -> totalPermits); } public static Builder builder() { @@ -112,17 +80,15 @@ public String getId() { @Override public int request(int amount) throws InterruptedException { - updateAvailablePermitsBasedOnDownstreamBackpressure(); return tryAcquire(amount, this.currentThroughputMode) ? amount : 0; } // @formatter:off @Override public int requestBatch() throws InterruptedException { - updateAvailablePermitsBasedOnDownstreamBackpressure(); - boolean useLowThroughput = CurrentThroughputMode.LOW.equals(this.currentThroughputMode) - || this.permitsLimit.get() < this.totalPermits; - return useLowThroughput ? requestInLowThroughputMode() : requestInHighThroughputMode(); + return CurrentThroughputMode.LOW.equals(this.currentThroughputMode) + ? requestInLowThroughputMode() + : requestInHighThroughputMode(); } private int requestInHighThroughputMode() throws InterruptedException { @@ -137,10 +103,10 @@ private int tryAcquirePartial() throws InterruptedException { if (availablePermits == 0 || BackPressureMode.ALWAYS_POLL_MAX_MESSAGES.equals(this.backPressureConfiguration)) { return 0; } - int permitsToRequest = min(availablePermits, this.batchSize); + int permitsToRequest = Math.min(availablePermits, this.batchSize); CurrentThroughputMode currentThroughputModeNow = this.currentThroughputMode; - logger.trace("Trying to acquire partial batch of {} permits from {} (limit {}) available for {} in TM {}", - permitsToRequest, availablePermits, this.permitsLimit.get(), this.id, currentThroughputModeNow); + logger.trace("Trying to acquire partial batch of {} permits from {} available for {} in TM {}", + permitsToRequest, availablePermits, this.id, currentThroughputModeNow); boolean hasAcquiredPartial = tryAcquire(permitsToRequest, currentThroughputModeNow); return hasAcquiredPartial ? permitsToRequest : 0; } @@ -148,35 +114,17 @@ private int tryAcquirePartial() throws InterruptedException { private int requestInLowThroughputMode() throws InterruptedException { // Although LTM can be set / unset by many processes, only the MessageSource thread gets here, // so no actual concurrency - logger.debug("Trying to acquire full permits for {}. Permits left: {}, Permits limit: {}", this.id, - this.semaphore.availablePermits(), this.permitsLimit.get()); - int permitsToRequest = min(this.permitsLimit.get(), this.totalPermits); - if (permitsToRequest == 0) { - logger.info("No permits usable for {} (limit = 0), sleeping for {}", this.id, - this.standbyLimitPollingInterval); - Thread.sleep(standbyLimitPollingInterval.toMillis()); - return 0; - } - boolean hasAcquired = tryAcquire(permitsToRequest, CurrentThroughputMode.LOW); + logger.debug("Trying to acquire full permits for {}. Permits left: {}", this.id, + this.semaphore.availablePermits()); + boolean hasAcquired = tryAcquire(this.totalPermits, CurrentThroughputMode.LOW); if (hasAcquired) { - if (permitsToRequest >= this.totalPermits) { - logger.debug("Acquired full permits for {}. Permits left: {}, Permits limit: {}", this.id, - this.semaphore.availablePermits(), this.permitsLimit.get()); - } - else { - logger.debug("Acquired limited permits ({}) for {} . Permits left: {}, Permits limit: {}", - permitsToRequest, this.id, this.semaphore.availablePermits(), this.permitsLimit.get()); - } - int tokens = min(this.batchSize, permitsToRequest); + logger.debug("Acquired full permits for {}. Permits left: {}", this.id, this.semaphore.availablePermits()); // We've acquired all permits - there's no other process currently processing messages if (!this.hasAcquiredFullPermits.compareAndSet(false, true)) { - logger.warn("hasAcquiredFullPermits was already true. Permits left: {}, Permits limit: {}", - this.semaphore.availablePermits(), this.permitsLimit.get()); + logger.warn("hasAcquiredFullPermits was already true. Permits left: {}", + this.semaphore.availablePermits()); } - else { - lowThroughputAcquiredPermits.set(permitsToRequest); - } - return tokens; + return this.batchSize; } else { return 0; @@ -184,20 +132,16 @@ private int requestInLowThroughputMode() throws InterruptedException { } private boolean tryAcquire(int amount, CurrentThroughputMode currentThroughputModeNow) throws InterruptedException { - if (isDraining.get()) { - return false; - } logger.trace("Acquiring {} permits for {} in TM {}", amount, this.id, this.currentThroughputMode); boolean hasAcquired = this.semaphore.tryAcquire(amount, this.acquireTimeout.toMillis(), TimeUnit.MILLISECONDS); if (hasAcquired) { - logger.trace("{} permits acquired for {} in TM {}. Permits left: {}, Permits limit: {}", amount, this.id, - currentThroughputModeNow, this.semaphore.availablePermits(), this.permitsLimit.get()); + logger.trace("{} permits acquired for {} in TM {}. Permits left: {}", amount, this.id, + currentThroughputModeNow, this.semaphore.availablePermits()); } else { - logger.trace( - "Not able to acquire {} permits in {} milliseconds for {} in TM {}. Permits left: {}, Permits limit: {}", - amount, this.acquireTimeout.toMillis(), this.id, currentThroughputModeNow, - this.semaphore.availablePermits(), this.permitsLimit.get()); + logger.trace("Not able to acquire {} permits in {} milliseconds for {} in TM {}. Permits left: {}", amount, + this.acquireTimeout.toMillis(), this.id, currentThroughputModeNow, + this.semaphore.availablePermits()); } return hasAcquired; } @@ -237,13 +181,11 @@ public void release(int amount) { } private int getPermitsToRelease(int amount) { - if (this.hasAcquiredFullPermits.compareAndSet(true, false)) { - int allAcquiredPermits = this.lowThroughputAcquiredPermits.getAndSet(0); - // The first process that gets here should release all permits except for inflight messages - // We can have only one batch of messages at this point since we have all permits - return (allAcquiredPermits - (min(this.batchSize, allAcquiredPermits) - amount)); - } - return amount; + return this.hasAcquiredFullPermits.compareAndSet(true, false) + // The first process that gets here should release all permits except for inflight messages + // We can have only one batch of messages at this point since we have all permits + ? this.totalPermits - (this.batchSize - amount) + : amount; } private void maybeSwitchToHighThroughputMode(int amount) { @@ -258,8 +200,6 @@ private void maybeSwitchToHighThroughputMode(int amount) { public boolean drain(Duration timeout) { logger.debug("Waiting for up to {} seconds for approx. {} permits to be released for {}", timeout.getSeconds(), this.totalPermits - this.semaphore.availablePermits(), this.id); - isDraining.set(true); - updateMaxPermitsLimit(this.totalPermits); try { return this.semaphore.tryAcquire(this.totalPermits, (int) timeout.getSeconds(), TimeUnit.SECONDS); } @@ -269,44 +209,6 @@ public boolean drain(Duration timeout) { } } - private int min(int a, int p) { - return Math.max(0, Math.min(a, p)); - } - - private void updateAvailablePermitsBasedOnDownstreamBackpressure() { - if (!isDraining.get()) { - int limit = backPressureLimiter.limit(); - int newCurrentMaxPermits = min(limit, totalPermits); - updateMaxPermitsLimit(newCurrentMaxPermits); - if (isDraining.get()) { - updateMaxPermitsLimit(totalPermits); - } - } - } - - private void updateMaxPermitsLimit(int newCurrentMaxPermits) { - int oldValue = permitsLimit.getAndUpdate(i -> min(newCurrentMaxPermits, totalPermits)); - if (newCurrentMaxPermits < oldValue) { - int blockedPermits = oldValue - newCurrentMaxPermits; - semaphore.reducePermits(blockedPermits); - } - else if (newCurrentMaxPermits > oldValue) { - int releasedPermits = newCurrentMaxPermits - oldValue; - semaphore.release(releasedPermits); - } - } - - private static class ReducibleSemaphore extends Semaphore { - ReducibleSemaphore(int permits) { - super(permits); - } - - @Override - public void reducePermits(int reduction) { - super.reducePermits(reduction); - } - } - private enum CurrentThroughputMode { HIGH, @@ -321,14 +223,10 @@ public static class Builder { private int totalPermits; - private Duration standbyLimitPollingInterval; - private Duration acquireTimeout; private BackPressureMode backPressureMode; - private BackPressureLimiter backPressureLimiter; - public Builder batchSize(int batchSize) { this.batchSize = batchSize; return this; @@ -339,11 +237,6 @@ public Builder totalPermits(int totalPermits) { return this; } - public Builder standbyLimitPollingInterval(Duration standbyLimitPollingInterval) { - this.standbyLimitPollingInterval = standbyLimitPollingInterval; - return this; - } - public Builder acquireTimeout(Duration acquireTimeout) { this.acquireTimeout = acquireTimeout; return this; @@ -354,14 +247,10 @@ public Builder throughputConfiguration(BackPressureMode backPressureConfiguratio return this; } - public Builder backPressureLimiter(BackPressureLimiter backPressureLimiter) { - this.backPressureLimiter = backPressureLimiter; - return this; - } - public SemaphoreBackPressureHandler build() { - Assert.noNullElements(Arrays.asList(this.batchSize, this.totalPermits, this.standbyLimitPollingInterval, - this.acquireTimeout, this.backPressureMode), "Missing configuration"); + Assert.noNullElements( + Arrays.asList(this.batchSize, this.totalPermits, this.acquireTimeout, this.backPressureMode), + "Missing configuration"); return new SemaphoreBackPressureHandler(this); }