diff --git a/all/pom.xml b/all/pom.xml index 28f689ada92..009c6cc6f34 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -463,6 +463,10 @@ io.helidon.common.features helidon-common-features + + io.helidon.common.concurrency + helidon-common-concurrency-limits + io.helidon.dbclient helidon-dbclient @@ -996,6 +1000,10 @@ io.helidon.webserver helidon-webserver-service-common + + io.helidon.webserver + helidon-webserver-concurrency-limits + io.helidon.webserver.testing.junit5 helidon-webserver-testing-junit5 diff --git a/bom/pom.xml b/bom/pom.xml index 73de6d8cff9..45dea7e1b97 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -610,6 +610,11 @@ helidon-common-features ${helidon.version} + + io.helidon.common.concurrency + helidon-common-concurrency-limits + ${helidon.version} + @@ -1303,6 +1308,11 @@ helidon-webserver-service-common ${helidon.version} + + io.helidon.webserver + helidon-webserver-concurrency-limits + ${helidon.version} + io.helidon.webserver.testing.junit5 helidon-webserver-testing-junit5 diff --git a/common/concurrency/limits/README.md b/common/concurrency/limits/README.md new file mode 100644 index 00000000000..c17184f10ff --- /dev/null +++ b/common/concurrency/limits/README.md @@ -0,0 +1,37 @@ +Concurrency Limits +----- + +This module provides concurrency limits, so we can limit the number of concurrent, in-progress operations (for example in WebServer). + +The implemented concurrency limits are: + +| Key | Weight | Description | +|---------|--------|--------------------------------------------------------------| +| `fixed` | `90` | Semaphore based concurrency limit, supports queueing | +| `aimd` | `80` | AIMD based limit (additive-increase/multiplicative-decrease) | + +Current usage: `helidon-webserver` + +The weight is not significant (unless you want to override an implementation using your own Limit with a higher weight), as the usages in Helidon use a single (optional) implementation that must be correctly typed in +configuration. + +# Fixed concurrency limit + +The fixed concurrency limit is based on a semaphore behavior. +You can define the number of available permits, then each time a token is requested, a permit (if available) is returned. +When the token is finished (through one of its lifecycle operations), the permit is returned. + +When the limit is set to 0, an unlimited implementation is used. + +The fixed limit also provides support for defining a queue. If set to a value above `0`, queuing is enabled. In such a case we enqueue a certain number of requests (with a configurable timeout). + +Defaults are: +- `permits: 0` - unlimited permits (no limit) +- `queue-length: 0` - no queuing +- `queue-timeout: PT1S` - 1 second timout in queue, if queuing is enabled + +# AIMD concurrency limit + +The additive-increase/multiplicative-decrease (AIMD) algorithm is a feedback control algorithm best known for its use in TCP congestion control. AIMD combines linear growth of the congestion window when there is no congestion with an exponential reduction when congestion is detected. + +This implementation provides variable concurrency limit with fixed minimal/maximal number of permits. diff --git a/common/concurrency/limits/pom.xml b/common/concurrency/limits/pom.xml new file mode 100644 index 00000000000..79f981da434 --- /dev/null +++ b/common/concurrency/limits/pom.xml @@ -0,0 +1,127 @@ + + + + + 4.0.0 + + io.helidon.common.concurrency + helidon-common-concurrency-project + 4.2.0-SNAPSHOT + ../pom.xml + + + helidon-common-concurrency-limits + Helidon Common Concurrency Limits + + + + io.helidon.common + helidon-common + + + io.helidon.builder + helidon-builder-api + + + io.helidon.common + helidon-common-config + + + io.helidon.service + helidon-service-registry + true + + + io.helidon.config + helidon-config + test + + + io.helidon.config + helidon-config-yaml + test + + + org.hamcrest + hamcrest-all + test + + + org.junit.jupiter + junit-jupiter-api + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + io.helidon.codegen + helidon-codegen-apt + ${helidon.version} + + + io.helidon.builder + helidon-builder-codegen + ${helidon.version} + + + io.helidon.config.metadata + helidon-config-metadata-codegen + ${helidon.version} + + + io.helidon.codegen + helidon-codegen-helidon-copyright + ${helidon.version} + + + + + + io.helidon.codegen + helidon-codegen-apt + ${helidon.version} + + + io.helidon.builder + helidon-builder-codegen + ${helidon.version} + + + io.helidon.config.metadata + helidon-config-metadata-codegen + ${helidon.version} + + + io.helidon.codegen + helidon-codegen-helidon-copyright + ${helidon.version} + + + + + + diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java new file mode 100644 index 00000000000..1c901e8b78c --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.Semaphore; +import java.util.function.Consumer; + +import io.helidon.builder.api.RuntimeType; +import io.helidon.common.config.Config; + +/** + * AIMD based limiter. + *

+ * The additive-increase/multiplicative-decrease (AIMD) algorithm is a feedback control algorithm best known for its use in TCP + * congestion control. AIMD combines linear growth of the congestion window when there is no congestion with an exponential + * reduction when congestion is detected. + */ +@SuppressWarnings("removal") +@RuntimeType.PrototypedBy(AimdLimitConfig.class) +public class AimdLimit implements Limit, SemaphoreLimit, RuntimeType.Api { + static final String TYPE = "aimd"; + + private final AimdLimitConfig config; + private final AimdLimitImpl aimdLimitImpl; + + private AimdLimit(AimdLimitConfig config) { + this.config = config; + this.aimdLimitImpl = new AimdLimitImpl(config); + } + + /** + * Create a new fluent API builder to construct {@link io.helidon.common.concurrency.limits.AimdLimit} + * instance. + * + * @return fluent API builder + */ + public static AimdLimitConfig.Builder builder() { + return AimdLimitConfig.builder(); + } + + /** + * Create a new instance with all defaults. + * + * @return a new limit instance + */ + public static AimdLimit create() { + return builder().build(); + } + + /** + * Create a new instance from configuration. + * + * @param config configuration of the AIMD limit + * @return a new limit instance configured from {@code config} + */ + public static AimdLimit create(Config config) { + return builder() + .config(config) + .build(); + } + + /** + * Create a new instance from configuration. + * + * @param config configuration of the AIMD limit + * @return a new limit instance configured from {@code config} + */ + public static AimdLimit create(AimdLimitConfig config) { + return new AimdLimit(config); + } + + /** + * Create a new instance customizing its configuration. + * + * @param consumer consumer of configuration builder + * @return a new limit instance configured from the builder + */ + public static AimdLimit create(Consumer consumer) { + return builder() + .update(consumer) + .build(); + } + + @Override + public T invoke(Callable callable) throws Exception { + return aimdLimitImpl.invoke(callable); + } + + @Override + public void invoke(Runnable runnable) throws Exception { + aimdLimitImpl.invoke(runnable); + } + + @Override + public Optional tryAcquire(boolean wait) { + return aimdLimitImpl.tryAcquire(); + } + + @SuppressWarnings("removal") + @Override + public Semaphore semaphore() { + return aimdLimitImpl.semaphore(); + } + + @Override + public String name() { + return config.name(); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public AimdLimitConfig prototype() { + return config; + } + + @Override + public Limit copy() { + return config.build(); + } +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitConfigBlueprint.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitConfigBlueprint.java new file mode 100644 index 00000000000..400fbb99682 --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitConfigBlueprint.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.time.Duration; +import java.util.Optional; +import java.util.function.Supplier; + +import io.helidon.builder.api.Option; +import io.helidon.builder.api.Prototype; +import io.helidon.common.concurrency.limits.spi.LimitProvider; + +/** + * Configuration of {@link io.helidon.common.concurrency.limits.AimdLimit}. + */ +@Prototype.Blueprint +@Prototype.Configured(value = AimdLimit.TYPE, root = false) +@Prototype.Provides(LimitProvider.class) +interface AimdLimitConfigBlueprint extends Prototype.Factory { + /** + * Backoff ratio to use for the algorithm. + * The value must be within [0.5, 1.0). + * + * @return backoff ratio + */ + @Option.Configured + @Option.DefaultDouble(0.9) + double backoffRatio(); + + /** + * Initial limit. + * The value must be within [{@link #minLimit()}, {@link #maxLimit()}]. + * + * @return initial limit + */ + @Option.Configured + @Option.DefaultInt(20) + int initialLimit(); + + /** + * Maximal limit. + * The value must be same or higher than {@link #minLimit()}. + * + * @return maximal limit + */ + @Option.Configured + @Option.DefaultInt(200) + int maxLimit(); + + /** + * Minimal limit. + * The value must be same or lower than {@link #maxLimit()}. + * + * @return minimal limit + */ + @Option.Configured + @Option.DefaultInt(20) + int minLimit(); + + /** + * Timeout that when exceeded is the same as if the task failed. + * + * @return task timeout, defaults to 5 seconds + */ + @Option.Configured + @Option.Default("PT5S") + Duration timeout(); + + /** + * A clock that supplies nanosecond time. + * + * @return supplier of current nanoseconds, defaults to {@link java.lang.System#nanoTime()} + */ + Optional> clock(); + + /** + * Name of this instance. + * + * @return name of the instance + */ + @Option.Default(AimdLimit.TYPE) + String name(); +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java new file mode 100644 index 00000000000..8842965d5a1 --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java @@ -0,0 +1,193 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.io.Serial; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +import io.helidon.common.config.ConfigException; + +class AimdLimitImpl { + private final double backoffRatio; + private final long timeoutInNanos; + private final int minLimit; + private final int maxLimit; + + private final Supplier clock; + private final AtomicInteger concurrentRequests; + private final AdjustableSemaphore semaphore; + + private final AtomicInteger limit; + private final Lock limitLock = new ReentrantLock(); + + AimdLimitImpl(AimdLimitConfig config) { + int initialLimit = config.initialLimit(); + this.backoffRatio = config.backoffRatio(); + this.timeoutInNanos = config.timeout().toNanos(); + this.minLimit = config.minLimit(); + this.maxLimit = config.maxLimit(); + this.clock = config.clock().orElseGet(() -> System::nanoTime); + + this.concurrentRequests = new AtomicInteger(); + this.semaphore = new AdjustableSemaphore(initialLimit); + + this.limit = new AtomicInteger(initialLimit); + + if (!(backoffRatio < 1.0 && backoffRatio >= 0.5)) { + throw new ConfigException("Backoff ratio must be within [0.5, 1.0)"); + } + if (maxLimit < minLimit) { + throw new ConfigException("Max limit must be higher than min limit, or equal to it"); + } + if (initialLimit > maxLimit) { + throw new ConfigException("Initial limit must be lower than max limit, or equal to it"); + } + if (initialLimit < minLimit) { + throw new ConfigException("Initial limit must be higher than minimum limit, or equal to it"); + } + } + + Semaphore semaphore() { + return semaphore; + } + + int currentLimit() { + return limit.get(); + } + + Optional tryAcquire() { + if (!semaphore.tryAcquire()) { + return Optional.empty(); + } + + return Optional.of(new AimdToken(clock, concurrentRequests)); + } + + void invoke(Runnable runnable) throws Exception { + invoke(() -> { + runnable.run(); + return null; + }); + } + + T invoke(Callable callable) throws Exception { + long startTime = clock.get(); + int currentRequests = concurrentRequests.incrementAndGet(); + + if (semaphore.tryAcquire()) { + try { + T response = callable.call(); + updateWithSample(startTime, clock.get(), currentRequests, true); + return response; + } catch (IgnoreTaskException e) { + return e.handle(); + } catch (Throwable e) { + updateWithSample(startTime, clock.get(), currentRequests, false); + throw e; + } finally { + concurrentRequests.decrementAndGet(); + semaphore.release(); + } + } else { + throw new LimitException("No more permits available for the semaphore"); + } + } + + void updateWithSample(long startTime, long endTime, int currentRequests, boolean success) { + long rtt = endTime - startTime; + + int currentLimit = limit.get(); + if (rtt > timeoutInNanos || !success) { + currentLimit = (int) (currentLimit * backoffRatio); + } else if (currentRequests * 2 >= currentLimit) { + currentLimit = currentLimit + 1; + } + setLimit(Math.min(maxLimit, Math.max(minLimit, currentLimit))); + } + + private void setLimit(int newLimit) { + if (newLimit == limit.get()) { + // already have the correct limit + return; + } + // now we lock, to do this only once in parallel, + // as otherwise we may end up in strange lands + limitLock.lock(); + try { + int oldLimit = limit.get(); + if (oldLimit == newLimit) { + // parallel thread already fixed it + return; + } + limit.set(newLimit); + + if (newLimit > oldLimit) { + this.semaphore.release(newLimit - oldLimit); + } else { + this.semaphore.reducePermits(oldLimit - newLimit); + } + } finally { + limitLock.unlock(); + } + } + + private static final class AdjustableSemaphore extends Semaphore { + @Serial + private static final long serialVersionUID = 114L; + + private AdjustableSemaphore(int permits) { + super(permits); + } + + @Override + protected void reducePermits(int reduction) { + super.reducePermits(reduction); + } + } + + private class AimdToken implements Limit.Token { + private final long startTime; + private final int currentRequests; + + private AimdToken(Supplier clock, AtomicInteger concurrentRequests) { + startTime = clock.get(); + currentRequests = concurrentRequests.incrementAndGet(); + } + + @Override + public void dropped() { + updateWithSample(startTime, clock.get(), currentRequests, false); + } + + @Override + public void ignore() { + concurrentRequests.decrementAndGet(); + } + + @Override + public void success() { + updateWithSample(startTime, clock.get(), currentRequests, true); + concurrentRequests.decrementAndGet(); + } + } +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitProvider.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitProvider.java new file mode 100644 index 00000000000..ae0af21e0ab --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitProvider.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import io.helidon.common.Weight; +import io.helidon.common.concurrency.limits.spi.LimitProvider; +import io.helidon.common.config.Config; + +/** + * {@link java.util.ServiceLoader} service provider for {@link io.helidon.common.concurrency.limits.AimdLimit} + * limit implementation. + */ +@Weight(80) +public class AimdLimitProvider implements LimitProvider { + /** + * Constructor required by the service loader. + */ + public AimdLimitProvider() { + } + + @Override + public String configKey() { + return AimdLimit.TYPE; + } + + @Override + public Limit create(Config config, String name) { + return AimdLimit.builder() + .config(config) + .name(name) + .build(); + } +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimit.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimit.java new file mode 100644 index 00000000000..97255156e4e --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimit.java @@ -0,0 +1,349 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import io.helidon.builder.api.RuntimeType; +import io.helidon.common.config.Config; + +/** + * Semaphore based limit, that supports queuing for a permit, and timeout on the queue. + * The default behavior is non-queuing. + * + * @see io.helidon.common.concurrency.limits.FixedLimitConfig + */ +@SuppressWarnings("removal") +@RuntimeType.PrototypedBy(FixedLimitConfig.class) +public class FixedLimit implements Limit, SemaphoreLimit, RuntimeType.Api { + /** + * Default limit, meaning unlimited execution. + */ + public static final int DEFAULT_LIMIT = 0; + /** + * Default length of the queue. + */ + public static final int DEFAULT_QUEUE_LENGTH = 0; + /** + * Timeout of a request that is enqueued. + */ + public static final String DEFAULT_QUEUE_TIMEOUT_DURATION = "PT1S"; + + static final String TYPE = "fixed"; + + private final FixedLimitConfig config; + private final LimiterHandler handler; + private final int initialPermits; + + private FixedLimit(FixedLimitConfig config) { + this.config = config; + if (config.permits() == 0 && config.semaphore().isEmpty()) { + this.handler = new NoOpSemaphoreHandler(); + this.initialPermits = 0; + } else { + Semaphore semaphore = config.semaphore().orElseGet(() -> new Semaphore(config.permits(), config.fair())); + this.initialPermits = semaphore.availablePermits(); + if (config.queueLength() == 0) { + this.handler = new RealSemaphoreHandler(semaphore); + } else { + this.handler = new QueuedSemaphoreHandler(semaphore, + config.queueLength(), + config.queueTimeout()); + } + } + } + + /** + * Create a new fluent API builder to construct {@link FixedLimit} + * instance. + * + * @return fluent API builder + */ + public static FixedLimitConfig.Builder builder() { + return FixedLimitConfig.builder(); + } + + /** + * Create a new instance with all defaults (no limit). + * + * @return a new limit instance + */ + public static FixedLimit create() { + return builder().build(); + } + + /** + * Create an instance from the provided semaphore. + * + * @param semaphore semaphore to use + * @return a new fixed limit backed by the provided semaphore + */ + public static FixedLimit create(Semaphore semaphore) { + return builder() + .semaphore(semaphore) + .build(); + } + + /** + * Create a new instance from configuration. + * + * @param config configuration of the fixed limit + * @return a new limit instance configured from {@code config} + */ + public static FixedLimit create(Config config) { + return builder() + .config(config) + .build(); + } + + /** + * Create a new instance from configuration. + * + * @param config configuration of the fixed limit + * @return a new limit instance configured from {@code config} + */ + public static FixedLimit create(FixedLimitConfig config) { + return new FixedLimit(config); + } + + /** + * Create a new instance customizing its configuration. + * + * @param consumer consumer of configuration builder + * @return a new limit instance configured from the builder + */ + public static FixedLimit create(Consumer consumer) { + return builder() + .update(consumer) + .build(); + } + + @Override + public T invoke(Callable callable) throws Exception { + return handler.invoke(callable); + } + + @Override + public void invoke(Runnable runnable) throws Exception { + handler.invoke(runnable); + } + + @Override + public Optional tryAcquire(boolean wait) { + return handler.tryAcquire(wait); + } + + @SuppressWarnings("removal") + @Override + public Semaphore semaphore() { + return handler.semaphore(); + } + + @Override + public FixedLimitConfig prototype() { + return config; + } + + @Override + public String name() { + return config.name(); + } + + @Override + public String type() { + return FixedLimit.TYPE; + } + + @Override + public Limit copy() { + if (config.semaphore().isPresent()) { + Semaphore semaphore = config.semaphore().get(); + + return FixedLimitConfig.builder() + .from(config) + .semaphore(new Semaphore(initialPermits, semaphore.isFair())) + .build(); + } + return config.build(); + } + + @SuppressWarnings("removal") + private interface LimiterHandler extends SemaphoreLimit, LimitAlgorithm { + } + + private static class NoOpSemaphoreHandler implements LimiterHandler { + private static final Token TOKEN = new Token() { + @Override + public void dropped() { + } + + @Override + public void ignore() { + } + + @Override + public void success() { + } + }; + + @Override + public T invoke(Callable callable) throws Exception { + try { + return callable.call(); + } catch (IgnoreTaskException e) { + return e.handle(); + } + } + + @Override + public void invoke(Runnable runnable) { + runnable.run(); + } + + @Override + public Optional tryAcquire(boolean wait) { + return Optional.of(TOKEN); + } + + @SuppressWarnings("removal") + @Override + public Semaphore semaphore() { + return NoopSemaphore.INSTANCE; + } + } + + @SuppressWarnings("removal") + private static class RealSemaphoreHandler implements LimiterHandler { + private final Semaphore semaphore; + + private RealSemaphoreHandler(Semaphore semaphore) { + this.semaphore = semaphore; + } + + @Override + public T invoke(Callable callable) throws Exception { + if (semaphore.tryAcquire()) { + try { + return callable.call(); + } catch (IgnoreTaskException e) { + return e.handle(); + } finally { + semaphore.release(); + } + } else { + throw new LimitException("No more permits available for the semaphore"); + } + } + + @Override + public void invoke(Runnable runnable) throws Exception { + if (semaphore.tryAcquire()) { + try { + runnable.run(); + } catch (IgnoreTaskException e) { + e.handle(); + } finally { + semaphore.release(); + } + } else { + throw new LimitException("No more permits available for the semaphore"); + } + } + + @Override + public Optional tryAcquire(boolean wait) { + if (!semaphore.tryAcquire()) { + return Optional.empty(); + } + return Optional.of(new SemaphoreToken(semaphore)); + } + + @Override + public Semaphore semaphore() { + return semaphore; + } + } + + private static class QueuedSemaphoreHandler implements LimiterHandler { + private final Semaphore semaphore; + private final int queueLength; + private final long timeoutMillis; + + private QueuedSemaphoreHandler(Semaphore semaphore, int queueLength, Duration queueTimeout) { + this.semaphore = semaphore; + this.queueLength = queueLength; + this.timeoutMillis = queueTimeout.toMillis(); + } + + @Override + public Optional tryAcquire(boolean wait) { + if (semaphore.getQueueLength() >= this.queueLength) { + // this is an estimate - we do not promise to be precise here + return Optional.empty(); + } + + try { + if (wait) { + if (!semaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS)) { + return Optional.empty(); + } + } else { + if (!semaphore.tryAcquire()) { + return Optional.empty(); + } + } + + } catch (InterruptedException e) { + return Optional.empty(); + } + return Optional.of(new SemaphoreToken(semaphore)); + } + + @Override + public Semaphore semaphore() { + return semaphore; + } + } + + private static class SemaphoreToken implements Token { + private final Semaphore semaphore; + + private SemaphoreToken(Semaphore semaphore) { + this.semaphore = semaphore; + } + + @Override + public void dropped() { + semaphore.release(); + } + + @Override + public void ignore() { + semaphore.release(); + } + + @Override + public void success() { + semaphore.release(); + } + } +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimitConfigBlueprint.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimitConfigBlueprint.java new file mode 100644 index 00000000000..c5e672c70cd --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimitConfigBlueprint.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Semaphore; + +import io.helidon.builder.api.Option; +import io.helidon.builder.api.Prototype; +import io.helidon.common.concurrency.limits.spi.LimitProvider; + +/** + * Configuration of {@link FixedLimit}. + * + * @see #permits() + * @see #queueLength() + * @see #queueTimeout() + */ +@Prototype.Blueprint +@Prototype.Configured(value = FixedLimit.TYPE, root = false) +@Prototype.Provides(LimitProvider.class) +interface FixedLimitConfigBlueprint extends Prototype.Factory { + /** + * Number of permit to allow. + * Defaults to {@value FixedLimit#DEFAULT_LIMIT}. + * When set to {@code 0}, we switch to unlimited. + * + * @return number of permits + */ + @Option.Configured + @Option.DefaultInt(FixedLimit.DEFAULT_LIMIT) + int permits(); + + /** + * Whether the {@link java.util.concurrent.Semaphore} should be {@link java.util.concurrent.Semaphore#isFair()}. + * Defaults to {@code false}. + * + * @return whether this should be a fair semaphore + */ + @Option.Configured + @Option.DefaultBoolean(false) + boolean fair(); + + /** + * How many requests can be enqueued waiting for a permit. + * Note that this may not be an exact behavior due to concurrent invocations. + * We use {@link java.util.concurrent.Semaphore#getQueueLength()} in the + * {@link io.helidon.common.concurrency.limits.FixedLimit} implementation. + * Default value is {@value FixedLimit#DEFAULT_QUEUE_LENGTH}. + * If set to {code 0}, there is no queueing. + * + * @return number of requests to enqueue + */ + @Option.Configured + @Option.DefaultInt(FixedLimit.DEFAULT_QUEUE_LENGTH) + int queueLength(); + + /** + * How long to wait for a permit when enqueued. + * Defaults to {@value FixedLimit#DEFAULT_QUEUE_TIMEOUT_DURATION} + * + * @return duration of the timeout + */ + @Option.Configured + @Option.Default(FixedLimit.DEFAULT_QUEUE_TIMEOUT_DURATION) + Duration queueTimeout(); + + /** + * Name of this instance. + * + * @return name of the instance + */ + @Option.Default(FixedLimit.TYPE) + String name(); + + /** + * Explicitly configured semaphore. + * Note that if this is set, all other configuration is ignored. + * + * @return semaphore instance + */ + Optional semaphore(); + +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimitProvider.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimitProvider.java new file mode 100644 index 00000000000..2d221579bf1 --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimitProvider.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import io.helidon.common.Weight; +import io.helidon.common.concurrency.limits.spi.LimitProvider; +import io.helidon.common.config.Config; + +/** + * {@link java.util.ServiceLoader} service provider for {@link FixedLimit} + * limit implementation. + */ +@Weight(90) +public class FixedLimitProvider implements LimitProvider { + /** + * Constructor required by the service loader. + */ + public FixedLimitProvider() { + } + + @Override + public String configKey() { + return FixedLimit.TYPE; + } + + @Override + public Limit create(Config config, String name) { + return FixedLimit.builder() + .config(config) + .name(name) + .build(); + } +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/IgnoreTaskException.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/IgnoreTaskException.java new file mode 100644 index 00000000000..e6e7458fb89 --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/IgnoreTaskException.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.util.Objects; + +/** + * If this exception is thrown from a limited task within + * {@link Limit#invoke(java.util.concurrent.Callable)}, the + * invocation will be ignored by possible algorithms (for example when considering round-trip timing). + *

+ * This should be used for cases where we never got to execute the intended task. + * This exception should never be thrown by {@link Limit}, it should always + * be translated to a proper return type, or actual exception. + */ +public class IgnoreTaskException extends RuntimeException { + /** + * Desired return value, if we want to ignore the result, yet we still provide valid response. + */ + private final Object returnValue; + /** + * Exception to throw to the user. This is to allow throwing an exception while ignoring it for limits algorithm. + */ + private final Exception exception; + + /** + * Create a new instance with a cause. + * + * @param cause the cause of this exception + */ + public IgnoreTaskException(Exception cause) { + super(Objects.requireNonNull(cause)); + + this.exception = cause; + this.returnValue = null; + } + + /** + * Create a new instance with a return value. + * + * @param returnValue value to return, even though this invocation should be ignored + * return value may be {@code null}. + */ + public IgnoreTaskException(Object returnValue) { + this.exception = null; + this.returnValue = returnValue; + } + + /** + * This is used by limit implementations to either return the value, or throw an exception. + * + * @return the value provided to be the return value + * @param type of the return value + * @throws Exception exception provided by the task + */ + @SuppressWarnings("unchecked") + public T handle() throws Exception { + if (returnValue == null && exception != null) { + throw exception; + } + return (T) returnValue; + } +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/Limit.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/Limit.java new file mode 100644 index 00000000000..fc375762006 --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/Limit.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import io.helidon.common.config.NamedService; +import io.helidon.service.registry.Service; + +/** + * Contract for a concurrency limiter. + */ +@Service.Contract +public interface Limit extends LimitAlgorithm, NamedService { + /** + * Create a copy of this limit with the same configuration. + * + * @return a copy of this limit + */ + Limit copy(); +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitAlgorithm.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitAlgorithm.java new file mode 100644 index 00000000000..c81761abb65 --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitAlgorithm.java @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.util.Optional; +import java.util.concurrent.Callable; + +/** + * Concurrency limit algorithm. + *

+ * There are two options how to use a limit - by handling a token provided by {@link #tryAcquire()}, + * or by invoking a callable or runnable through one of the invoke methods (such as {@link #invoke(Runnable)}. + *

+ * The invoke methods are backed by the same {@link #tryAcquire()} methods, so behavior is consistent. + */ +public interface LimitAlgorithm { + /** + * Invoke a callable within the limits of this limiter. + *

+ * {@link io.helidon.common.concurrency.limits.Limit} implementors note: + * Make sure to catch {@link io.helidon.common.concurrency.limits.IgnoreTaskException} from the + * callable, and call its {@link IgnoreTaskException#handle()} to either return the provided result, + * or throw the exception after ignoring the timing for future decisions. + * + * @param callable callable to execute within the limit + * @param the callable return type + * @return result of the callable + * @throws LimitException in case the limiter did not have an available permit + * @throws java.lang.Exception in case the task failed with an exception + */ + default T invoke(Callable callable) throws LimitException, Exception { + Optional token = tryAcquire(); + if (token.isEmpty()) { + throw new LimitException("No token available."); + } + Token permit = token.get(); + try { + T response = callable.call(); + permit.success(); + return response; + } catch (IgnoreTaskException e) { + permit.ignore(); + return e.handle(); + } catch (Exception e) { + permit.dropped(); + throw e; + } + } + + /** + * Invoke a runnable within the limits of this limiter. + *

+ * {@link io.helidon.common.concurrency.limits.Limit} implementors note: + * Make sure to catch {@link io.helidon.common.concurrency.limits.IgnoreTaskException} from the + * runnable, and call its {@link IgnoreTaskException#handle()} to either return the provided result, + * or throw the exception after ignoring the timing for future decisions. + * + * @param runnable runnable to execute within the limit + * @throws LimitException in case the limiter did not have an available permit + * @throws java.lang.Exception in case the task failed with an exception + */ + default void invoke(Runnable runnable) throws LimitException, Exception { + Optional token = tryAcquire(); + if (token.isEmpty()) { + throw new LimitException("No token available."); + } + Token permit = token.get(); + try { + runnable.run(); + permit.success(); + } catch (IgnoreTaskException e) { + permit.ignore(); + e.handle(); + } catch (Exception e) { + permit.dropped(); + throw e; + } + } + + /** + * Try to acquire a token, waiting for available permits for the configured amount of time, if queuing is enabled. + *

+ * If acquired, the caller must call one of the {@link io.helidon.common.concurrency.limits.Limit.Token} + * operations to release the token. + * If the response is empty, the limit does not have an available token. + * + * @return acquired token, or empty if there is no available token + */ + default Optional tryAcquire() { + return tryAcquire(true); + } + + /** + * Try to acquire a token, waiting for available permits for the configured amount of time, if + * {@code wait} is enabled, returning immediately otherwise. + *

+ * If acquired, the caller must call one of the {@link io.helidon.common.concurrency.limits.Limit.Token} + * operations to release the token. + * If the response is empty, the limit does not have an available token. + * + * @param wait whether to wait in the queue (if one is configured/available in the limit), or to return immediately + * @return acquired token, or empty if there is no available token + */ + Optional tryAcquire(boolean wait); + + /** + * When a token is retrieved from {@link #tryAcquire()}, one of its methods must be called when the task + * is over, to release the token back to the pool (such as a permit returned to a {@link java.util.concurrent.Semaphore}). + *

+ * Choice of method to invoke may influence the algorithm used for determining number of available permits. + */ + interface Token { + /** + * Operation was dropped, for example because it hit a timeout, or was rejected by other limits. + * Loss based {@link io.helidon.common.concurrency.limits.Limit} implementations will likely do an aggressive + * reducing in limit when this happens. + */ + void dropped(); + + /** + * The operation failed before any meaningful RTT measurement could be made and should be ignored to not + * introduce an artificially low RTT. + */ + void ignore(); + + /** + * Notification that the operation succeeded and internally measured latency should be used as an RTT sample. + */ + void success(); + } +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitException.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitException.java new file mode 100644 index 00000000000..987d9cf99fa --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitException.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.util.Objects; + +/** + * A limit was reached and the submitted task cannot be executed. + * + * @see io.helidon.common.concurrency.limits.Limit#invoke(java.util.concurrent.Callable) + * @see io.helidon.common.concurrency.limits.Limit#invoke(Runnable) + */ +public class LimitException extends RuntimeException { + /** + * A new limit exception with a cause. + * + * @param cause cause of the limit reached + */ + public LimitException(Exception cause) { + super(Objects.requireNonNull(cause)); + } + + /** + * A new limit exception with a message. + * + * @param message description of why the limit was reached + */ + public LimitException(String message) { + super(Objects.requireNonNull(message)); + } +} diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/NoopSemaphore.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/NoopSemaphore.java similarity index 79% rename from webserver/webserver/src/main/java/io/helidon/webserver/NoopSemaphore.java rename to common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/NoopSemaphore.java index a20c8790089..b86210e0f77 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/NoopSemaphore.java +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/NoopSemaphore.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,18 +14,28 @@ * limitations under the License. */ -package io.helidon.webserver; +package io.helidon.common.concurrency.limits; import java.util.Collection; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -/* +/** * A semaphore that does nothing. + * Use {@link #INSTANCE} to get an instance of this semaphore. + * + * @deprecated this is only provided for backward compatibility and will be removed, use + * {@link FixedLimit#create()} to get unlimited limit */ -class NoopSemaphore extends Semaphore { - NoopSemaphore() { +@Deprecated(forRemoval = true, since = "4.2.0") +public class NoopSemaphore extends Semaphore { + /** + * Singleton instance to be used whenever needed. + */ + public static final Semaphore INSTANCE = new NoopSemaphore(); + + private NoopSemaphore() { super(0); } diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/SemaphoreLimit.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/SemaphoreLimit.java new file mode 100644 index 00000000000..77332ce397b --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/SemaphoreLimit.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.util.concurrent.Semaphore; + +/** + * The {@link io.helidon.common.concurrency.limits.Limit} is backed by a semaphore, and this provides + * direct access to the semaphore. + * Note that this usage may bypass calculation of limits if the semaphore is used directly. + * This is for backward compatibility only, and will be removed. + * + * @deprecated DO NOT USE except for backward compatibility with semaphore based handling + */ +@Deprecated(since = "4.2.0", forRemoval = true) +public interface SemaphoreLimit { + /** + * Underlying semaphore of this limit. + * + * @return the semaphore instance + * @deprecated this only exists for backward compatibility of Helidon WebServer and will be removed + */ + @Deprecated(forRemoval = true, since = "4.2.0") + Semaphore semaphore(); +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/package-info.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/package-info.java new file mode 100644 index 00000000000..2804c1ef810 --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Concurrency limits API and default implementations. + * + * @see io.helidon.common.concurrency.limits.Limit + * @see io.helidon.common.concurrency.limits.FixedLimit + */ +package io.helidon.common.concurrency.limits; diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/spi/LimitProvider.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/spi/LimitProvider.java new file mode 100644 index 00000000000..c192b734ff9 --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/spi/LimitProvider.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits.spi; + +import io.helidon.common.concurrency.limits.Limit; +import io.helidon.common.config.ConfiguredProvider; +import io.helidon.service.registry.Service; + +/** + * A {@link java.util.ServiceLoader} (and service registry) service provider to discover rate limits. + */ +@Service.Contract +public interface LimitProvider extends ConfiguredProvider { +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/spi/package-info.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/spi/package-info.java new file mode 100644 index 00000000000..e88d31d397d --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/spi/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Extension points to create custom concurrency rate limits. + */ +package io.helidon.common.concurrency.limits.spi; diff --git a/common/concurrency/limits/src/main/java/module-info.java b/common/concurrency/limits/src/main/java/module-info.java new file mode 100644 index 00000000000..f1b23377399 --- /dev/null +++ b/common/concurrency/limits/src/main/java/module-info.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Concurrency limits. + * + * @see io.helidon.common.concurrency.limits + */ +module io.helidon.common.concurrency.limits { + requires static io.helidon.service.registry; + + requires io.helidon.builder.api; + requires io.helidon.common; + requires io.helidon.common.config; + + exports io.helidon.common.concurrency.limits; + exports io.helidon.common.concurrency.limits.spi; + + provides io.helidon.common.concurrency.limits.spi.LimitProvider + with io.helidon.common.concurrency.limits.FixedLimitProvider, + io.helidon.common.concurrency.limits.AimdLimitProvider; +} \ No newline at end of file diff --git a/common/concurrency/limits/src/main/resources/META-INF/helidon/service.loader b/common/concurrency/limits/src/main/resources/META-INF/helidon/service.loader new file mode 100644 index 00000000000..e2dd011e9fa --- /dev/null +++ b/common/concurrency/limits/src/main/resources/META-INF/helidon/service.loader @@ -0,0 +1,2 @@ +# List of service contracts we want to support either from service registry, or from service loader +io.helidon.common.concurrency.limits.spi.LimitProvider diff --git a/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/AimdLimitTest.java b/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/AimdLimitTest.java new file mode 100644 index 00000000000..b2f8615eb1d --- /dev/null +++ b/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/AimdLimitTest.java @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class AimdLimitTest { + @Test + void decreaseOnDrops() { + AimdLimitConfig config = AimdLimitConfig.builder() + .initialLimit(30) + .buildPrototype(); + + AimdLimitImpl limiter = new AimdLimitImpl(config); + + assertThat(limiter.currentLimit(), is(30)); + limiter.updateWithSample(0, 0, 0, false); + assertThat(limiter.currentLimit(), is(27)); + } + + @Test + void decreaseOnTimeoutExceeded() { + Duration timeout = Duration.ofSeconds(1); + AimdLimitConfig config = AimdLimitConfig.builder() + .initialLimit(30) + .timeout(timeout) + .buildPrototype(); + AimdLimitImpl limiter = new AimdLimitImpl(config); + limiter.updateWithSample(0, timeout.toNanos() + 1, 0, true); + assertThat(limiter.currentLimit(), is(27)); + } + + @Test + void increaseOnSuccess() { + AimdLimitConfig config = AimdLimitConfig.builder() + .initialLimit(20) + .buildPrototype(); + AimdLimitImpl limiter = new AimdLimitImpl(config); + limiter.updateWithSample(0, Duration.ofMillis(1).toNanos(), 10, true); + assertThat(limiter.currentLimit(), is(21)); + } + + @Test + void successOverflow() { + AimdLimitConfig config = AimdLimitConfig.builder() + .initialLimit(21) + .maxLimit(21) + .minLimit(0) + .buildPrototype(); + AimdLimitImpl limiter = new AimdLimitImpl(config); + limiter.updateWithSample(0, Duration.ofMillis(1).toNanos(), 10, true); + // after success limit should still be at the max. + assertThat(limiter.currentLimit(), is(21)); + } + + @Test + void testDefault() { + AimdLimitConfig config = AimdLimitConfig.builder() + .minLimit(10) + .initialLimit(10) + .buildPrototype(); + AimdLimitImpl limiter = new AimdLimitImpl(config); + assertThat(limiter.currentLimit(), is(10)); + } + + @Test + void concurrentUpdatesAndReads() throws InterruptedException { + AimdLimitConfig config = AimdLimitConfig.builder() + .initialLimit(1) + .backoffRatio(0.9) + .timeout(Duration.ofMillis(100)) + .minLimit(1) + .maxLimit(200) + .buildPrototype(); + AimdLimitImpl limit = new AimdLimitImpl(config); + + int threadCount = 100; + int operationsPerThread = 1_000; + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch endLatch = new CountDownLatch(threadCount); + + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger timeoutCount = new AtomicInteger(0); + AtomicInteger dropCount = new AtomicInteger(0); + + for (int i = 0; i < threadCount; i++) { + executor.submit(() -> { + try { + startLatch.await(); // Wait for all threads to be ready + for (int j = 0; j < operationsPerThread; j++) { + long startTime = System.nanoTime(); + long rtt = (long) (Math.random() * 200_000_000); // 0-200ms + int concurrentRequests = (int) (Math.random() * limit.currentLimit() * 2); + boolean didDrop = Math.random() < 0.01; // 1% chance of drop + + limit.updateWithSample(startTime, rtt, concurrentRequests, !didDrop); + + if (didDrop) { + dropCount.incrementAndGet(); + } else if (rtt > config.timeout().toNanos()) { + timeoutCount.incrementAndGet(); + } else { + successCount.incrementAndGet(); + } + + // Read the current limit + int currentLimit = limit.currentLimit(); + assertThat(currentLimit, is(greaterThanOrEqualTo(config.minLimit()))); + assertThat(currentLimit, is(lessThanOrEqualTo(config.maxLimit()))); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + endLatch.countDown(); + } + }); + } + + startLatch.countDown(); // Start all threads + boolean finished = endLatch.await(10, TimeUnit.SECONDS); + executor.shutdown(); + + assertThat("Test did not complete in time", finished, is(true)); + + assertThat("Total operations mismatch", + threadCount * operationsPerThread, + is(successCount.get() + timeoutCount.get() + dropCount.get())); + } + + @Test + public void testSemaphoreReleased() throws Exception { + Limit limit = AimdLimit.builder() + .minLimit(5) + .initialLimit(5) + .build(); + + for (int i = 0; i < 5000; i++) { + limit.invoke(() -> {}); + } + } +} diff --git a/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/ConfiguredLimitTest.java b/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/ConfiguredLimitTest.java new file mode 100644 index 00000000000..9c9a79e96bb --- /dev/null +++ b/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/ConfiguredLimitTest.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.time.Duration; +import java.util.Optional; + +import io.helidon.config.Config; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; + +public class ConfiguredLimitTest { + private static Config config; + + @BeforeAll + public static void init() { + config = Config.create(); + } + + @Test + public void testFixed() { + LimitUsingConfig limitConfig = LimitUsingConfig.create(config.get("first")); + Optional configuredLimit = limitConfig.concurrencyLimit(); + assertThat(configuredLimit, not(Optional.empty())); + Limit limit = configuredLimit.get(); + + assertThat(limit.name(), is("server-listener")); + assertThat(limit.type(), is("fixed")); + + FixedLimitConfig prototype = ((FixedLimit) limit).prototype(); + assertThat("Permits", prototype.permits(), is(1)); + assertThat("Queue length", prototype.queueLength(), is(20)); + assertThat("Should be fair", prototype.fair(), is(true)); + assertThat("Queue timeout", prototype.queueTimeout(), is(Duration.ofSeconds(42))); + } + + @Test + public void testAimd() { + LimitUsingConfig limitConfig = LimitUsingConfig.create(config.get("second")); + Optional configuredLimit = limitConfig.concurrencyLimit(); + assertThat(configuredLimit, not(Optional.empty())); + Limit limit = configuredLimit.get(); + + assertThat(limit.name(), is("aimd")); + assertThat(limit.type(), is("aimd")); + + AimdLimitConfig prototype = ((AimdLimit) limit).prototype(); + assertThat("Timeout", prototype.timeout(), is(Duration.ofSeconds(42))); + assertThat("Min limit", prototype.minLimit(), is(11)); + assertThat("Max limit", prototype.maxLimit(), is(22)); + assertThat("Initial limit", prototype.initialLimit(), is(14)); + assertThat("Backoff ratio", prototype.backoffRatio(), is(0.74)); + } +} diff --git a/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/FixedLimitTest.java b/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/FixedLimitTest.java new file mode 100644 index 00000000000..d9f8fca15e1 --- /dev/null +++ b/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/FixedLimitTest.java @@ -0,0 +1,211 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class FixedLimitTest { + @Test + public void testUnlimited() throws InterruptedException { + FixedLimit limiter = FixedLimit.create(); + int concurrency = 5; + CountDownLatch cdl = new CountDownLatch(1); + CountDownLatch threadsCdl = new CountDownLatch(concurrency); + + Lock lock = new ReentrantLock(); + List result = new ArrayList<>(concurrency); + + Thread[] threads = new Thread[concurrency]; + for (int i = 0; i < concurrency; i++) { + int index = i; + threads[i] = new Thread(() -> { + try { + limiter.invoke(() -> { + threadsCdl.countDown(); + cdl.await(10, TimeUnit.SECONDS); + lock.lock(); + try { + result.add("result_" + index); + } finally { + lock.unlock(); + } + return null; + }); + } catch (Exception e) { + threadsCdl.countDown(); + throw new RuntimeException(e); + } + }); + } + for (Thread thread : threads) { + thread.start(); + } + threadsCdl.await(); + cdl.countDown(); + for (Thread thread : threads) { + thread.join(Duration.ofSeconds(5)); + } + assertThat(result, hasSize(concurrency)); + } + + @Test + public void testLimit() throws Exception { + FixedLimit limiter = FixedLimit.builder() + .permits(1) + .build(); + + int concurrency = 5; + CountDownLatch cdl = new CountDownLatch(1); + CountDownLatch threadsCdl = new CountDownLatch(concurrency); + + Lock lock = new ReentrantLock(); + List result = new ArrayList<>(concurrency); + AtomicInteger failures = new AtomicInteger(); + + Thread[] threads = new Thread[concurrency]; + for (int i = 0; i < concurrency; i++) { + int index = i; + threads[i] = new Thread(() -> { + try { + limiter.invoke(() -> { + threadsCdl.countDown(); + cdl.await(10, TimeUnit.SECONDS); + lock.lock(); + try { + result.add("result_" + index); + } finally { + lock.unlock(); + } + return null; + }); + } catch (LimitException e) { + threadsCdl.countDown(); + failures.incrementAndGet(); + } catch (Exception e) { + threadsCdl.countDown(); + throw new RuntimeException(e); + } + }); + } + + for (Thread thread : threads) { + thread.start(); + } + // wait for all threads to reach appropriate destination + threadsCdl.await(); + cdl.countDown(); + for (Thread thread : threads) { + thread.join(Duration.ofSeconds(5)); + } + assertThat(failures.get(), is(concurrency - 1)); + assertThat(result.size(), is(1)); + } + + @Test + public void testLimitWithQueue() throws Exception { + FixedLimit limiter = FixedLimit.builder() + .permits(1) + .queueLength(1) + .queueTimeout(Duration.ofSeconds(5)) + .build(); + + int concurrency = 5; + CountDownLatch cdl = new CountDownLatch(1); + + Lock lock = new ReentrantLock(); + List result = new ArrayList<>(concurrency); + AtomicInteger failures = new AtomicInteger(); + + Thread[] threads = new Thread[concurrency]; + for (int i = 0; i < concurrency; i++) { + int index = i; + threads[i] = new Thread(() -> { + try { + limiter.invoke(() -> { + cdl.await(10, TimeUnit.SECONDS); + lock.lock(); + try { + result.add("result_" + index); + } finally { + lock.unlock(); + } + return null; + }); + } catch (LimitException e) { + failures.incrementAndGet(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + for (Thread thread : threads) { + thread.start(); + } + // wait for the threads to reach their destination (either failed, or on cdl, or in queue) + TimeUnit.MILLISECONDS.sleep(100); + cdl.countDown(); + for (Thread thread : threads) { + thread.join(Duration.ofSeconds(5)); + } + // 1 submitted, 1 in queue (may be less failures, as the queue length is not guaranteed to be atomic + assertThat(failures.get(), lessThanOrEqualTo(concurrency - 2)); + // may be 2 or more (1 submitted, 1 or more queued) + assertThat(result.size(), greaterThanOrEqualTo(2)); + } + + @Test + public void testSemaphoreReleased() throws Exception { + Limit limit = FixedLimit.builder() + .permits(5) + .build(); + + for (int i = 0; i < 5000; i++) { + limit.invoke(() -> { + }); + } + } + + @Test + public void testSemaphoreReleasedWithQueue() throws Exception { + Limit limit = FixedLimit.builder() + .permits(5) + .queueLength(10) + .queueTimeout(Duration.ofMillis(100)) + .build(); + + for (int i = 0; i < 5000; i++) { + limit.invoke(() -> { + }); + } + } +} diff --git a/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/LimitUsingConfigBlueprint.java b/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/LimitUsingConfigBlueprint.java new file mode 100644 index 00000000000..35d99bb95c2 --- /dev/null +++ b/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/LimitUsingConfigBlueprint.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.util.Optional; + +import io.helidon.builder.api.Option; +import io.helidon.builder.api.Prototype; +import io.helidon.common.concurrency.limits.spi.LimitProvider; + +@Prototype.Blueprint +@Prototype.Configured +interface LimitUsingConfigBlueprint { + @Option.Provider(value = LimitProvider.class, discoverServices = false) + @Option.Configured + Optional concurrencyLimit(); +} diff --git a/common/concurrency/limits/src/test/resources/application.yaml b/common/concurrency/limits/src/test/resources/application.yaml new file mode 100644 index 00000000000..69540cde1d1 --- /dev/null +++ b/common/concurrency/limits/src/test/resources/application.yaml @@ -0,0 +1,32 @@ +# +# Copyright (c) 2024 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +first: + concurrency-limit: + - type: "fixed" + name: "server-listener" + fair: true + permits: 1 + queue-length: 20 + queue-timeout: "PT42S" +second: + concurrency-limit: + aimd: + timeout: "PT42S" + min-limit: 11 + max-limit: 22 + initial-limit: 14 + backoff-ratio: 0.74 diff --git a/common/concurrency/pom.xml b/common/concurrency/pom.xml new file mode 100644 index 00000000000..1346a2b660a --- /dev/null +++ b/common/concurrency/pom.xml @@ -0,0 +1,39 @@ + + + + + 4.0.0 + + io.helidon.common + helidon-common-project + 4.2.0-SNAPSHOT + ../pom.xml + + + io.helidon.common.concurrency + helidon-common-concurrency-project + Helidon Common Concurrency Project + + pom + + + limits + + diff --git a/common/pom.xml b/common/pom.xml index 0949c39bf92..b62636185ab 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -55,6 +55,7 @@ tls types uri + concurrency diff --git a/microprofile/websocket/pom.xml b/microprofile/websocket/pom.xml index b3b1ee999bb..ab00465935a 100644 --- a/microprofile/websocket/pom.xml +++ b/microprofile/websocket/pom.xml @@ -76,6 +76,10 @@ jakarta.websocket jakarta.websocket-client-api + + io.helidon.common.concurrency + helidon-common-concurrency-limits + io.helidon.common.features helidon-common-features-api diff --git a/microprofile/websocket/src/main/java/io/helidon/microprofile/tyrus/TyrusConnection.java b/microprofile/websocket/src/main/java/io/helidon/microprofile/tyrus/TyrusConnection.java index 581f8d0abeb..3172a3cd32c 100644 --- a/microprofile/websocket/src/main/java/io/helidon/microprofile/tyrus/TyrusConnection.java +++ b/microprofile/websocket/src/main/java/io/helidon/microprofile/tyrus/TyrusConnection.java @@ -25,8 +25,12 @@ import io.helidon.common.buffers.BufferData; import io.helidon.common.buffers.DataReader; +import io.helidon.common.concurrency.limits.FixedLimit; +import io.helidon.common.concurrency.limits.Limit; +import io.helidon.common.concurrency.limits.LimitException; import io.helidon.common.socket.SocketContext; import io.helidon.http.DateTime; +import io.helidon.webserver.CloseConnectionException; import io.helidon.webserver.ConnectionContext; import io.helidon.webserver.spi.ServerConnection; import io.helidon.websocket.WsCloseCodes; @@ -67,33 +71,45 @@ class TyrusConnection implements ServerConnection, WsSession { } @Override - public void handle(Semaphore requestSemaphore) { + public void handle(Limit limit) { myThread = Thread.currentThread(); DataReader dataReader = ctx.dataReader(); - listener.onOpen(this); - if (requestSemaphore.tryAcquire()) { + + try { + limit.invoke(() -> listener.onOpen(this)); + } catch (LimitException e) { + listener.onError(this, e); + throw new CloseConnectionException("Too many concurrent requests"); + } catch (Exception e) { + listener.onError(this, e); + listener.onClose(this, WsCloseCodes.UNEXPECTED_CONDITION, e.getMessage()); + return; + } + + while (canRun) { try { - while (canRun) { - try { - readingNetwork = true; - BufferData buffer = dataReader.readBuffer(); - readingNetwork = false; - lastRequestTimestamp = DateTime.timestamp(); - listener.onMessage(this, buffer, true); - lastRequestTimestamp = DateTime.timestamp(); - } catch (Exception e) { - listener.onError(this, e); - listener.onClose(this, WsCloseCodes.UNEXPECTED_CONDITION, e.getMessage()); - return; - } - } - listener.onClose(this, WsCloseCodes.NORMAL_CLOSE, "Idle timeout"); - } finally { - requestSemaphore.release(); + readingNetwork = true; + BufferData buffer = dataReader.readBuffer(); + readingNetwork = false; + lastRequestTimestamp = DateTime.timestamp(); + limit.invoke(() -> listener.onMessage(this, buffer, true)); + lastRequestTimestamp = DateTime.timestamp(); + } catch (LimitException e) { + listener.onClose(this, WsCloseCodes.TRY_AGAIN_LATER, "Too Many Concurrent Requests"); + return; + } catch (Exception e) { + listener.onError(this, e); + listener.onClose(this, WsCloseCodes.UNEXPECTED_CONDITION, e.getMessage()); + return; } - } else { - listener.onClose(this, WsCloseCodes.TRY_AGAIN_LATER, "Too Many Concurrent Requests"); } + listener.onClose(this, WsCloseCodes.NORMAL_CLOSE, "Idle timeout"); + } + + @SuppressWarnings("removal") + @Override + public void handle(Semaphore requestSemaphore) { + handle(FixedLimit.create(requestSemaphore)); } @Override diff --git a/microprofile/websocket/src/main/java/module-info.java b/microprofile/websocket/src/main/java/module-info.java index 939671a2800..da725db8de5 100644 --- a/microprofile/websocket/src/main/java/module-info.java +++ b/microprofile/websocket/src/main/java/module-info.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2023 Oracle and/or its affiliates. + * Copyright (c) 2020, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,6 +46,7 @@ requires static io.helidon.common.features.api; requires transitive jakarta.websocket; + requires transitive io.helidon.common.concurrency.limits; exports io.helidon.microprofile.tyrus; diff --git a/webserver/concurrency-limits/pom.xml b/webserver/concurrency-limits/pom.xml new file mode 100644 index 00000000000..61bb4bb3010 --- /dev/null +++ b/webserver/concurrency-limits/pom.xml @@ -0,0 +1,129 @@ + + + + 4.0.0 + + io.helidon.webserver + helidon-webserver-project + 4.2.0-SNAPSHOT + + + helidon-webserver-concurrency-limits + Helidon WebServer Concurrency Limits + Feature that adds filters for concurrency limits + + + + io.helidon.webserver + helidon-webserver + + + io.helidon.common.concurrency + helidon-common-concurrency-limits + + + io.helidon.common + helidon-common-config + + + helidon-builder-api + io.helidon.builder + + + io.helidon.webserver.testing.junit5 + helidon-webserver-testing-junit5 + test + + + org.junit.jupiter + junit-jupiter-api + test + + + org.hamcrest + hamcrest-all + test + + + io.helidon.logging + helidon-logging-jul + test + + + io.helidon.config + helidon-config-yaml + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + io.helidon.config.metadata + helidon-config-metadata-codegen + ${helidon.version} + + + io.helidon.codegen + helidon-codegen-apt + ${helidon.version} + + + io.helidon.builder + helidon-builder-codegen + ${helidon.version} + + + io.helidon.codegen + helidon-codegen-helidon-copyright + ${helidon.version} + + + + + + io.helidon.config.metadata + helidon-config-metadata-codegen + ${helidon.version} + + + io.helidon.codegen + helidon-codegen-apt + ${helidon.version} + + + io.helidon.builder + helidon-builder-codegen + ${helidon.version} + + + io.helidon.codegen + helidon-codegen-helidon-copyright + ${helidon.version} + + + + + + diff --git a/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeature.java b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeature.java new file mode 100644 index 00000000000..62f77a2cb10 --- /dev/null +++ b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeature.java @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.concurrency.limits; + +import java.util.Set; +import java.util.function.Consumer; + +import io.helidon.builder.api.RuntimeType; +import io.helidon.common.Weighted; +import io.helidon.common.config.Config; +import io.helidon.webserver.WebServer; +import io.helidon.webserver.spi.ServerFeature; + +/** + * Server feature that adds limits as filters. + *

+ * When using this feature, the limits operation is enforced within a filter, i.e. after the request + * is accepted. This means it is used only for HTTP requests. + */ +@RuntimeType.PrototypedBy(LimitsFeatureConfig.class) +public class LimitsFeature implements ServerFeature, Weighted, RuntimeType.Api { + /** + * Default weight of this feature. It is the first feature to be registered (above context and access log). + *

+ * Context: 1100 + *

+ * Access Log: 1000 + *

+ * This feature: {@value} + */ + public static final double WEIGHT = 2000; + static final String ID = "limits"; + + private final LimitsFeatureConfig config; + + private LimitsFeature(LimitsFeatureConfig config) { + this.config = config; + } + + /** + * Fluent API builder to set up an instance. + * + * @return a new builder + */ + public static LimitsFeatureConfig.Builder builder() { + return LimitsFeatureConfig.builder(); + } + + /** + * Create a new instance from its configuration. + * + * @param config configuration + * @return a new feature + */ + public static LimitsFeature create(LimitsFeatureConfig config) { + return new LimitsFeature(config); + } + + /** + * Create a new instance customizing its configuration. + * + * @param builderConsumer consumer of configuration + * @return a new feature + */ + public static LimitsFeature create(Consumer builderConsumer) { + return builder() + .update(builderConsumer) + .build(); + } + + /** + * Create a new limits feature with default setup, but enabled. + * + * @return a new feature + */ + public static LimitsFeature create() { + return builder() + .enabled(true) + .build(); + } + + /** + * Create a new context feature with custom setup. + * + * @param config configuration + * @return a new configured feature + */ + public static LimitsFeature create(Config config) { + return builder() + .config(config) + .build(); + } + + @Override + public void setup(ServerFeatureContext featureContext) { + double featureWeight = config.weight(); + // all sockets + Set sockets = config.sockets(); + if (sockets.isEmpty()) { + // configure on default only + featureContext.socket(WebServer.DEFAULT_SOCKET_NAME) + .httpRouting() + .addFeature(new LimitsRoutingFeature(config, featureWeight)); + } else { + // configure on all configured + for (String socket : sockets) { + featureContext.socket(socket) + .httpRouting() + .addFeature(new LimitsRoutingFeature(config, featureWeight)); + } + } + } + + @Override + public String name() { + return config.name(); + } + + @Override + public String type() { + return ID; + } + + @Override + public double weight() { + return config.weight(); + } + + @Override + public LimitsFeatureConfig prototype() { + return config; + } +} diff --git a/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeatureConfigBlueprint.java b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeatureConfigBlueprint.java new file mode 100644 index 00000000000..dc681c4c172 --- /dev/null +++ b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeatureConfigBlueprint.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.concurrency.limits; + +import java.util.Optional; +import java.util.Set; + +import io.helidon.builder.api.Option; +import io.helidon.builder.api.Prototype; +import io.helidon.common.concurrency.limits.Limit; +import io.helidon.common.concurrency.limits.spi.LimitProvider; +import io.helidon.webserver.spi.ServerFeatureProvider; + +@Prototype.Blueprint +@Prototype.Configured(value = LimitsFeature.ID, root = false) +@Prototype.Provides(ServerFeatureProvider.class) +interface LimitsFeatureConfigBlueprint extends Prototype.Factory { + /** + * Weight of the context feature. As it is used by other features, the default is quite high: + * {@value LimitsFeature#WEIGHT}. + * + * @return weight of the feature + */ + @Option.DefaultDouble(LimitsFeature.WEIGHT) + @Option.Configured + double weight(); + + /** + * List of sockets to register this feature on. If empty, it would get registered on all sockets. + * + * @return socket names to register on, defaults to empty (all available sockets) + */ + @Option.Configured + Set sockets(); + + /** + * Name of this instance. + * + * @return instance name + */ + @Option.Default(LimitsFeature.ID) + String name(); + + /** + * Concurrency limit to use to limit concurrent execution of incoming requests. + * The default is to have unlimited concurrency. + * + * @return concurrency limit + */ + @Option.Provider(value = LimitProvider.class, discoverServices = false) + @Option.Configured + Optional concurrencyLimit(); + + /** + * Whether this feature is enabled, defaults to {@code true}. + * + * @return whether to enable this feature + */ + @Option.DefaultBoolean(true) + @Option.Configured + boolean enabled(); +} diff --git a/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeatureProvider.java b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeatureProvider.java new file mode 100644 index 00000000000..5d26cf8e461 --- /dev/null +++ b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeatureProvider.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.concurrency.limits; + +import io.helidon.common.Weight; +import io.helidon.common.config.Config; +import io.helidon.webserver.spi.ServerFeatureProvider; + +/** + * {@link java.util.ServiceLoader} provider implementation to automatically register this service. + *

+ * The required configuration (disabled by default): + *

+ * server:
+ *   features:
+ *     limits:
+ *       enabled: true
+ *       limit:
+ *         bulkhead:
+ *         limit: 10
+ *         queue: 100
+ * 
+ */ +@Weight(LimitsFeature.WEIGHT) +public class LimitsFeatureProvider implements ServerFeatureProvider { + /** + * Public constructor required by {@link java.util.ServiceLoader}. + */ + public LimitsFeatureProvider() { + } + + @Override + public String configKey() { + return LimitsFeature.ID; + } + + @Override + public LimitsFeature create(Config config, String name) { + return LimitsFeature.builder() + .config(config) + .name(name) + .build(); + } +} diff --git a/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsRoutingFeature.java b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsRoutingFeature.java new file mode 100644 index 00000000000..4e8fc48328c --- /dev/null +++ b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsRoutingFeature.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.concurrency.limits; + +import java.util.Optional; + +import io.helidon.common.Weighted; +import io.helidon.common.concurrency.limits.Limit; +import io.helidon.common.concurrency.limits.LimitAlgorithm; +import io.helidon.http.HttpException; +import io.helidon.http.Status; +import io.helidon.webserver.http.FilterChain; +import io.helidon.webserver.http.HttpFeature; +import io.helidon.webserver.http.HttpRouting; +import io.helidon.webserver.http.RoutingRequest; +import io.helidon.webserver.http.RoutingResponse; + +class LimitsRoutingFeature implements HttpFeature, Weighted { + private final double featureWeight; + private final Limit limits; + private final boolean enabled; + + LimitsRoutingFeature(LimitsFeatureConfig config, double featureWeight) { + this.featureWeight = featureWeight; + this.limits = config.concurrencyLimit().orElse(null); + this.enabled = config.enabled(); + } + + @Override + public void setup(HttpRouting.Builder builder) { + if (enabled && limits != null) { + builder.addFilter(this::filter); + } + } + + @Override + public double weight() { + return featureWeight; + } + + private void filter(FilterChain chain, RoutingRequest req, RoutingResponse res) { + Optional token = limits.tryAcquire(); + + if (token.isEmpty()) { + throw new HttpException("Limit exceeded", Status.SERVICE_UNAVAILABLE_503); + } + + LimitAlgorithm.Token permit = token.get(); + try { + chain.proceed(); + permit.success(); + } catch (Throwable e) { + permit.dropped(); + throw e; + } + } +} diff --git a/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/package-info.java b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/package-info.java new file mode 100644 index 00000000000..d1075d62ffb --- /dev/null +++ b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * An implementation of a feature to protect all server requests with a limit. + */ +package io.helidon.webserver.concurrency.limits; diff --git a/webserver/concurrency-limits/src/main/java/module-info.java b/webserver/concurrency-limits/src/main/java/module-info.java new file mode 100644 index 00000000000..d1d99c99b8a --- /dev/null +++ b/webserver/concurrency-limits/src/main/java/module-info.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Limits feature for Helidon WebServer. + */ +module io.helidon.webserver.concurrency.limits { + requires io.helidon.common; + requires io.helidon.http; + requires io.helidon.webserver; + + requires transitive io.helidon.builder.api; + requires transitive io.helidon.common.config; + requires transitive io.helidon.common.concurrency.limits; + + exports io.helidon.webserver.concurrency.limits; + + provides io.helidon.webserver.spi.ServerFeatureProvider + with io.helidon.webserver.concurrency.limits.LimitsFeatureProvider; + + uses io.helidon.common.concurrency.limits.spi.LimitProvider; +} \ No newline at end of file diff --git a/webserver/concurrency-limits/src/test/java/io/helidon/webserver/concurrency/limits/FixedLimitTest.java b/webserver/concurrency-limits/src/test/java/io/helidon/webserver/concurrency/limits/FixedLimitTest.java new file mode 100644 index 00000000000..71921d7ba81 --- /dev/null +++ b/webserver/concurrency-limits/src/test/java/io/helidon/webserver/concurrency/limits/FixedLimitTest.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.concurrency.limits; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import io.helidon.http.Status; +import io.helidon.webclient.api.ClientResponseTyped; +import io.helidon.webclient.http1.Http1Client; +import io.helidon.webserver.http.HttpRules; +import io.helidon.webserver.testing.junit5.ServerTest; +import io.helidon.webserver.testing.junit5.SetUpRoute; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +@ServerTest +public class FixedLimitTest { + private static final CountDownLatch FIRST_ENCOUNTER = new CountDownLatch(1); + private static final CountDownLatch FINISH_LATCH = new CountDownLatch(1); + + private final Http1Client client; + + public FixedLimitTest(Http1Client client) { + this.client = client; + } + + @SetUpRoute + public static void route(HttpRules rules) { + rules.get("/greet", (req, res) -> res.send("Hello")) + .get("/wait", (req, res) -> { + FIRST_ENCOUNTER.countDown(); + FINISH_LATCH.await(); + res.send("finished"); + }); + } + + @Test + public void testRequest() { + var response = client.get("/greet") + .request(String.class); + + assertThat(response.status(), is(Status.OK_200)); + assertThat(response.entity(), is("Hello")); + } + + @Test + public void testLimits() throws Exception { + Callable> callable = () -> { + return client.get("/wait") + .request(String.class); + }; + try (ExecutorService es = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory())) { + var first = es.submit(callable); + FIRST_ENCOUNTER.await(); + var secondResponse = es.submit(callable) + .get(5, TimeUnit.SECONDS); + + assertThat(secondResponse.status(), is(Status.SERVICE_UNAVAILABLE_503)); + FINISH_LATCH.countDown(); + var firstResponse = first.get(5, TimeUnit.SECONDS); + assertThat(firstResponse.status(), is(Status.OK_200)); + assertThat(firstResponse.entity(), is("finished")); + + } + } +} diff --git a/webserver/concurrency-limits/src/test/resources/application.yaml b/webserver/concurrency-limits/src/test/resources/application.yaml new file mode 100644 index 00000000000..beb5d7af950 --- /dev/null +++ b/webserver/concurrency-limits/src/test/resources/application.yaml @@ -0,0 +1,24 @@ +# +# Copyright (c) 2024 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +server: + features: + limits: + concurrency-limit: + fixed: + permits: 1 + queue-length: 0 + diff --git a/webserver/concurrency-limits/src/test/resources/logging.properties b/webserver/concurrency-limits/src/test/resources/logging.properties new file mode 100644 index 00000000000..eb45bdbe3fa --- /dev/null +++ b/webserver/concurrency-limits/src/test/resources/logging.properties @@ -0,0 +1,21 @@ +# +# Copyright (c) 2024 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +handlers=java.util.logging.ConsoleHandler +java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS.%1$tL %5$s%6$s%n +# Global logging level. Can be overridden by specific loggers +.level=INFO +io.helidon.webserver.level=INFO diff --git a/webserver/http2/pom.xml b/webserver/http2/pom.xml index d2758b26274..85461395674 100644 --- a/webserver/http2/pom.xml +++ b/webserver/http2/pom.xml @@ -44,6 +44,10 @@ io.helidon.builder helidon-builder-api
+ + io.helidon.common.concurrency + helidon-common-concurrency-limits + io.helidon.common.features helidon-common-features-api diff --git a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2Connection.java b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2Connection.java index fa70fa15fb9..295672a6130 100644 --- a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2Connection.java +++ b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2Connection.java @@ -27,6 +27,8 @@ import io.helidon.common.buffers.BufferData; import io.helidon.common.buffers.DataReader; +import io.helidon.common.concurrency.limits.FixedLimit; +import io.helidon.common.concurrency.limits.Limit; import io.helidon.common.task.InterruptableTask; import io.helidon.common.tls.TlsUtils; import io.helidon.http.DateTime; @@ -172,9 +174,9 @@ private static void applySetting(Http2Settings.Builder builder, long value, Http } @Override - public void handle(Semaphore requestSemaphore) throws InterruptedException { + public void handle(Limit limit) throws InterruptedException { try { - doHandle(requestSemaphore); + doHandle(limit); } catch (Http2Exception e) { if (state == State.FINISHED) { // already handled @@ -209,6 +211,12 @@ public void handle(Semaphore requestSemaphore) throws InterruptedException { } } + @SuppressWarnings("removal") + @Override + public void handle(Semaphore requestSemaphore) throws InterruptedException { + handle(FixedLimit.create(requestSemaphore)); + } + /** * Client settings, obtained from SETTINGS frame or HTTP/2 upgrade request. * @@ -326,7 +334,7 @@ Http2Settings clientSettings() { return clientSettings; } - private void doHandle(Semaphore requestSemaphore) throws InterruptedException { + private void doHandle(Limit limit) throws InterruptedException { myThread = Thread.currentThread(); while (canRun && state != State.FINISHED) { if (expectPreface && state != State.WRITE_SERVER_SETTINGS) { @@ -341,10 +349,9 @@ private void doHandle(Semaphore requestSemaphore) throws InterruptedException { // no data to read -> connection is closed throw new CloseConnectionException("Connection closed by client", e); } - dispatchHandler(requestSemaphore); - } else { - dispatchHandler(requestSemaphore); } + + dispatchHandler(limit); } if (state != State.FINISHED) { Http2GoAway frame = new Http2GoAway(0, Http2ErrorCode.NO_ERROR, "Idle timeout"); @@ -352,7 +359,7 @@ private void doHandle(Semaphore requestSemaphore) throws InterruptedException { } } - private void dispatchHandler(Semaphore requestSemaphore) { + private void dispatchHandler(Limit limit) { switch (state) { case CONTINUATION -> doContinuation(); case WRITE_SERVER_SETTINGS -> writeServerSettings(); @@ -360,7 +367,7 @@ private void dispatchHandler(Semaphore requestSemaphore) { case SETTINGS -> doSettings(); case ACK_SETTINGS -> ackSettings(); case DATA -> dataFrame(); - case HEADERS -> doHeaders(requestSemaphore); + case HEADERS -> doHeaders(limit); case PRIORITY -> doPriority(); case READ_PUSH_PROMISE -> throw new Http2Exception(Http2ErrorCode.REFUSED_STREAM, "Push promise not supported"); case PING -> pingFrame(); @@ -606,7 +613,7 @@ private void dataFrame() { state = State.READ_FRAME; } - private void doHeaders(Semaphore requestSemaphore) { + private void doHeaders(Limit limit) { int streamId = frameHeader.streamId(); StreamContext streamContext = stream(streamId); @@ -675,7 +682,7 @@ private void doHeaders(Semaphore requestSemaphore) { path, http2Config.validatePath()); stream.prologue(httpPrologue); - stream.requestSemaphore(requestSemaphore); + stream.requestLimit(limit); stream.headers(headers, endOfStream); state = State.READ_FRAME; diff --git a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java index bee1f707da6..50333162e29 100644 --- a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java +++ b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java @@ -18,11 +18,15 @@ import java.io.UncheckedIOException; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Semaphore; import io.helidon.common.buffers.BufferData; +import io.helidon.common.concurrency.limits.FixedLimit; +import io.helidon.common.concurrency.limits.Limit; +import io.helidon.common.concurrency.limits.LimitAlgorithm; import io.helidon.common.socket.SocketWriterException; import io.helidon.http.DirectHandler; import io.helidon.http.Header; @@ -99,10 +103,9 @@ class Http2ServerStream implements Runnable, Http2Stream { private Http2SubProtocolSelector.SubProtocolHandler subProtocolHandler; private long expectedLength = -1; private HttpPrologue prologue; - // create a semaphore if accessed before we get the one from connection + // create a limit if accessed before we get the one from connection // must be volatile, as it is accessed both from connection thread and from stream thread - private volatile Semaphore requestSemaphore = new Semaphore(1); - private boolean semaphoreAcquired; + private volatile Limit requestLimit = FixedLimit.create(new Semaphore(1)); /** * A new HTTP/2 server stream. @@ -324,9 +327,6 @@ public void run() { } finally { headers = null; subProtocolHandler = null; - if (semaphoreAcquired) { - requestSemaphore.release(); - } } } @@ -425,8 +425,8 @@ void write100Continue() { } } - void requestSemaphore(Semaphore requestSemaphore) { - this.requestSemaphore = requestSemaphore; + void requestLimit(Limit limit) { + this.requestLimit = limit; } void prologue(HttpPrologue prologue) { @@ -516,15 +516,35 @@ private void handle() { streamId, this::readEntityFromPipeline); Http2ServerResponse response = new Http2ServerResponse(this, request); - semaphoreAcquired = requestSemaphore.tryAcquire(); + try { - if (semaphoreAcquired) { - routing.route(ctx, request, response); - } else { + Optional token = requestLimit.tryAcquire(); + + if (token.isEmpty()) { ctx.log(LOGGER, TRACE, "Too many concurrent requests, rejecting request."); response.status(Status.SERVICE_UNAVAILABLE_503) .send("Too Many Concurrent Requests"); response.commit(); + } else { + LimitAlgorithm.Token permit = token.get(); + try { + routing.route(ctx, request, response); + } finally { + if (response.status() == Status.NOT_FOUND_404) { + permit.ignore(); + } else { + switch (response.status().family()) { + case INFORMATIONAL: + case SUCCESSFUL: + case REDIRECTION: + permit.success(); + break; + default: + permit.dropped(); + break; + } + } + } } } finally { request.content().consume(); diff --git a/webserver/http2/src/main/java/module-info.java b/webserver/http2/src/main/java/module-info.java index 73913df2958..108309d0231 100644 --- a/webserver/http2/src/main/java/module-info.java +++ b/webserver/http2/src/main/java/module-info.java @@ -40,6 +40,7 @@ requires transitive io.helidon.http.media; requires transitive io.helidon.http; requires transitive io.helidon.webserver; + requires transitive io.helidon.common.concurrency.limits; exports io.helidon.webserver.http2; exports io.helidon.webserver.http2.spi; diff --git a/webserver/pom.xml b/webserver/pom.xml index da19bb97dad..e7947386427 100644 --- a/webserver/pom.xml +++ b/webserver/pom.xml @@ -46,6 +46,7 @@ testing webserver websocket + concurrency-limits diff --git a/webserver/webserver/pom.xml b/webserver/webserver/pom.xml index b41d29d7e42..77bb9ada598 100644 --- a/webserver/webserver/pom.xml +++ b/webserver/webserver/pom.xml @@ -80,6 +80,10 @@ io.helidon.common.features helidon-common-features + + io.helidon.common.concurrency + helidon-common-concurrency-limits +