From bba43e5513f34ffba30109e2b9e01548c9202232 Mon Sep 17 00:00:00 2001 From: Brian Pontarelli Date: Tue, 29 Aug 2023 17:28:27 -0600 Subject: [PATCH] Changed the non-blocking queue to block so that large responses with lots of clients doesn't cause OOMEs. This also required adding in additional failure cases when clients are reading or writing bytes too slowly with the server. --- build.savant | 2 +- .../body/response/ChunkedBodyProcessor.java | 8 +- .../response/ContentLengthBodyProcessor.java | 8 +- ...va => BlockingByteBufferOutputStream.java} | 29 +++-- .../fusionauth/http/server/Configurable.java | 48 ++++++- .../http/server/HTTP11Processor.java | 73 ++++++++++- .../fusionauth/http/server/HTTPProcessor.java | 10 ++ .../http/server/HTTPResponseProcessor.java | 6 +- .../http/server/HTTPS11Processor.java | 12 +- .../http/server/HTTPServerConfiguration.java | 123 ++++++++++++++++-- .../http/server/HTTPServerThread.java | 75 +++++++---- .../java/io/fusionauth/http/BaseTest.java | 4 +- .../java/io/fusionauth/http/CoreTest.java | 72 +++++++++- 13 files changed, 395 insertions(+), 75 deletions(-) rename src/main/java/io/fusionauth/http/io/{NonBlockingByteBufferOutputStream.java => BlockingByteBufferOutputStream.java} (81%) diff --git a/build.savant b/build.savant index 44b1fb2..8143a7e 100644 --- a/build.savant +++ b/build.savant @@ -17,7 +17,7 @@ jackson5Version = "3.0.1" restifyVersion = "4.2.0" testngVersion = "7.8.0" -project(group: "io.fusionauth", name: "java-http", version: "0.2.2", licenses: ["ApacheV2_0"]) { +project(group: "io.fusionauth", name: "java-http", version: "0.2.3", licenses: ["ApacheV2_0"]) { workflow { fetch { // Dependency resolution order: diff --git a/src/main/java/io/fusionauth/http/body/response/ChunkedBodyProcessor.java b/src/main/java/io/fusionauth/http/body/response/ChunkedBodyProcessor.java index 949c91f..a210955 100644 --- a/src/main/java/io/fusionauth/http/body/response/ChunkedBodyProcessor.java +++ b/src/main/java/io/fusionauth/http/body/response/ChunkedBodyProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, FusionAuth, All Rights Reserved + * Copyright (c) 2022-2023, FusionAuth, All Rights Reserved * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ import java.nio.ByteBuffer; import io.fusionauth.http.HTTPValues.ControlBytes; -import io.fusionauth.http.io.NonBlockingByteBufferOutputStream; +import io.fusionauth.http.io.BlockingByteBufferOutputStream; /** * A body processor that handles chunked requests/responses. @@ -32,9 +32,9 @@ public class ChunkedBodyProcessor implements BodyProcessor { private final ByteBuffer[] currentBuffers = new ByteBuffer[3]; - private final NonBlockingByteBufferOutputStream outputStream; + private final BlockingByteBufferOutputStream outputStream; - public ChunkedBodyProcessor(NonBlockingByteBufferOutputStream outputStream) { + public ChunkedBodyProcessor(BlockingByteBufferOutputStream outputStream) { this.outputStream = outputStream; } diff --git a/src/main/java/io/fusionauth/http/body/response/ContentLengthBodyProcessor.java b/src/main/java/io/fusionauth/http/body/response/ContentLengthBodyProcessor.java index 34892e5..5208001 100644 --- a/src/main/java/io/fusionauth/http/body/response/ContentLengthBodyProcessor.java +++ b/src/main/java/io/fusionauth/http/body/response/ContentLengthBodyProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, FusionAuth, All Rights Reserved + * Copyright (c) 2022-2023, FusionAuth, All Rights Reserved * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ import java.nio.ByteBuffer; -import io.fusionauth.http.io.NonBlockingByteBufferOutputStream; +import io.fusionauth.http.io.BlockingByteBufferOutputStream; /** * A body processor that uses the Content-Length header to determine when the entire body has been read. @@ -27,9 +27,9 @@ public class ContentLengthBodyProcessor implements BodyProcessor { private final ByteBuffer[] currentBuffers = new ByteBuffer[1]; - private final NonBlockingByteBufferOutputStream outputStream; + private final BlockingByteBufferOutputStream outputStream; - public ContentLengthBodyProcessor(NonBlockingByteBufferOutputStream outputStream) { + public ContentLengthBodyProcessor(BlockingByteBufferOutputStream outputStream) { this.outputStream = outputStream; } diff --git a/src/main/java/io/fusionauth/http/io/NonBlockingByteBufferOutputStream.java b/src/main/java/io/fusionauth/http/io/BlockingByteBufferOutputStream.java similarity index 81% rename from src/main/java/io/fusionauth/http/io/NonBlockingByteBufferOutputStream.java rename to src/main/java/io/fusionauth/http/io/BlockingByteBufferOutputStream.java index ed63fe2..159fbc7 100644 --- a/src/main/java/io/fusionauth/http/io/NonBlockingByteBufferOutputStream.java +++ b/src/main/java/io/fusionauth/http/io/BlockingByteBufferOutputStream.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, FusionAuth, All Rights Reserved + * Copyright (c) 2022-2023, FusionAuth, All Rights Reserved * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,24 +17,25 @@ import java.io.OutputStream; import java.nio.ByteBuffer; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import io.fusionauth.http.server.Notifier; /** * This InputStream uses ByteBuffers read by the Server processor and piped into this class. Those ByteBuffers are then fed to the reader of - * the InputStream. The pushing of ByteBuffers never blocks but the reading will block if there are no more bytes to read. + * the InputStream. The pushing of ByteBuffers blocks based on the configuration and the reading will block if there are no more bytes to + * read. *

* In order to handle chunking and other considerations, this class can be sub-classed to preprocess bytes. * * @author Brian Pontarelli */ -public class NonBlockingByteBufferOutputStream extends OutputStream { +public class BlockingByteBufferOutputStream extends OutputStream { private final int bufferSize; - // Shared between writer and reader threads. No one blocks using this. - private final Queue buffers = new ConcurrentLinkedQueue<>(); + // Shared between writer and reader threads. + private final BlockingQueue buffers; private final Notifier notifier; @@ -45,9 +46,10 @@ public class NonBlockingByteBufferOutputStream extends OutputStream { private volatile boolean used; - public NonBlockingByteBufferOutputStream(Notifier notifier, int bufferSize) { - this.notifier = notifier; + public BlockingByteBufferOutputStream(Notifier notifier, int bufferSize, int maximumQueueSize) { + this.buffers = new LinkedBlockingQueue<>(maximumQueueSize); this.bufferSize = bufferSize; + this.notifier = notifier; } public void clear() { @@ -149,8 +151,13 @@ public void write(byte[] b, int off, int len) { private void addBuffer(boolean notify) { currentBuffer.flip(); - if (!buffers.offer(currentBuffer)) { - throw new IllegalStateException("The ConcurrentLinkedQueue is borked. It should never reject an offer() operation."); + + try { + // This will block until we have capacity. We looked and it seems as though there aren't any ways that a Worker thread can be in a + // state where it was interrupted by the HTTPServerThread and this line of code doesn't throw an InterruptedException. + buffers.put(currentBuffer); + } catch (InterruptedException e) { + throw new IllegalStateException(e); } currentBuffer = null; diff --git a/src/main/java/io/fusionauth/http/server/Configurable.java b/src/main/java/io/fusionauth/http/server/Configurable.java index fc25bde..54f3fc4 100644 --- a/src/main/java/io/fusionauth/http/server/Configurable.java +++ b/src/main/java/io/fusionauth/http/server/Configurable.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, FusionAuth, All Rights Reserved + * Copyright (c) 2022-2023, FusionAuth, All Rights Reserved * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -140,6 +140,20 @@ default T withLoggerFactory(LoggerFactory loggerFactory) { return (T) this; } + /** + * Sets the maximum length of the output buffer queue. Defaults to 128. + *

+ * This parameter will affect the runtime memory requirement of the server. This can be calculated by multiplying this value by values + * returned from {@link HTTPServerConfiguration#getResponseBufferSize()} and {@link HTTPServerConfiguration#getNumberOfWorkerThreads()}. + * + * @param outputBufferQueueLength The length of the output buffer queue. + * @return This. + */ + default T withMaxOutputBufferQueueLength(int outputBufferQueueLength) { + configuration().withMaxOutputBufferQueueLength(outputBufferQueueLength); + return (T) this; + } + /** * Sets the max preamble length (the start-line and headers constitute the head). Defaults to 64k * @@ -151,6 +165,30 @@ default T withMaxPreambleLength(int maxLength) { return (T) this; } + /** + * This configures the minimum number of bytes per second that a client must send a request to the server before the server closes the + * connection. + * + * @param bytesPerSecond The bytes per second throughput. + * @return This. + */ + default T withMinimumReadThroughput(long bytesPerSecond) { + configuration().withMinimumReadThroughput(bytesPerSecond); + return (T) this; + } + + /** + * This configures the minimum number of bytes per second that a client must read the response from the server before the server closes + * the connection. + * + * @param bytesPerSecond The bytes per second throughput. + * @return This. + */ + default T withMinimumWriteThroughput(long bytesPerSecond) { + configuration().withMinimumWriteThroughput(bytesPerSecond); + return (T) this; + } + /** * Sets the size of the buffer that is used to process the multipart request body. This defaults to 16k. * @@ -164,6 +202,10 @@ default T withMultipartBufferSize(int multipartBufferSize) { /** * Sets the number of worker threads that will handle requests coming into the HTTP server. Defaults to 40. + *

+ * This parameter will affect the runtime memory requirement of the server. This can be calculated by multiplying this value by the + * returned from {@link HTTPServerConfiguration#getResponseBufferSize()} and + * {@link HTTPServerConfiguration#getMaxOutputBufferQueueLength()}. * * @param numberOfWorkerThreads The number of worker threads. * @return This. @@ -197,6 +239,10 @@ default T withRequestBufferSize(int requestBufferSize) { /** * Sets the size of the buffer that is used to process the HTTP response. This defaults to 16k. + *

+ * This parameter will affect the runtime memory requirement of the server. This can be calculated by multiplying this value by the + * returned from {@link HTTPServerConfiguration#getNumberOfWorkerThreads()} and + * {@link HTTPServerConfiguration#getMaxOutputBufferQueueLength()}. * * @param responseBufferSize The size of the buffer. * @return This. diff --git a/src/main/java/io/fusionauth/http/server/HTTP11Processor.java b/src/main/java/io/fusionauth/http/server/HTTP11Processor.java index 64d1c0a..bd3d499 100644 --- a/src/main/java/io/fusionauth/http/server/HTTP11Processor.java +++ b/src/main/java/io/fusionauth/http/server/HTTP11Processor.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, FusionAuth, All Rights Reserved + * Copyright (c) 2022-2023, FusionAuth, All Rights Reserved * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,8 +18,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; +import java.util.concurrent.Future; -import io.fusionauth.http.io.NonBlockingByteBufferOutputStream; +import io.fusionauth.http.io.BlockingByteBufferOutputStream; import io.fusionauth.http.log.Logger; import io.fusionauth.http.util.ThreadPool; @@ -47,6 +48,18 @@ public class HTTP11Processor implements HTTPProcessor { private final ThreadPool threadPool; + private long bytesRead; + + private long bytesWritten; + + private long firstByteReadInstant = -1; + + private long firstByteWroteInstant = -1; + + private Future future; + + private long lastByteReadInstant = -1; + private long lastUsed = System.currentTimeMillis(); private volatile ProcessorState state; @@ -63,7 +76,7 @@ public HTTP11Processor(HTTPServerConfiguration configuration, HTTPListenerConfig this.request = new HTTPRequest(configuration.getContextPath(), configuration.getMultipartBufferSize(), listener.isTLS() ? "https" : "http", listener.getPort(), ipAddress); this.requestProcessor = new HTTPRequestProcessor(configuration, request); - NonBlockingByteBufferOutputStream outputStream = new NonBlockingByteBufferOutputStream(notifier, configuration.getResponseBufferSize()); + BlockingByteBufferOutputStream outputStream = new BlockingByteBufferOutputStream(notifier, configuration.getResponseBufferSize(), configuration.getMaxOutputBufferQueueLength()); this.response = new HTTPResponse(outputStream, request, configuration.isCompressByDefault()); this.responseProcessor = new HTTPResponseProcessor(configuration, request, response, outputStream); } @@ -72,6 +85,9 @@ public HTTP11Processor(HTTPServerConfiguration configuration, HTTPListenerConfig public ProcessorState close(boolean endOfStream) { logger.trace("(C)"); + // Interrupt the thread if it is still running + future.cancel(true); + // Set the state to Close and return it state = ProcessorState.Close; return state; @@ -119,6 +135,13 @@ public void markUsed() { public ProcessorState read(ByteBuffer buffer) throws IOException { markUsed(); + bytesRead += buffer.remaining(); + if (firstByteReadInstant == -1) { + lastByteReadInstant = firstByteReadInstant = System.currentTimeMillis(); + } else { + lastByteReadInstant = System.currentTimeMillis(); + } + logger.trace("(R)"); RequestState requestState = requestProcessor.state(); @@ -130,7 +153,7 @@ public ProcessorState read(ByteBuffer buffer) throws IOException { // If the next state is not preamble, that means we are done processing that and ready to handle the request in a separate thread if (requestState != RequestState.Preamble && requestState != RequestState.Expect) { logger.trace("(RWo)"); - threadPool.submit(new HTTPWorker(configuration.getHandler(), configuration.getLoggerFactory(), this, request, response)); + future = threadPool.submit(new HTTPWorker(configuration.getHandler(), configuration.getLoggerFactory(), this, request, response)); } } else { logger.trace("(RB)"); @@ -174,6 +197,25 @@ public ByteBuffer readBuffer() { return buffer; } + @Override + public long readThroughput() { + // Haven't read anything yet, or we read everything in the first read (instants are equal) + if (firstByteReadInstant == -1 || bytesRead == 0 || lastByteReadInstant == firstByteReadInstant) { + return Long.MAX_VALUE; + } + + if (firstByteWroteInstant == -1) { + long millis = System.currentTimeMillis() - firstByteReadInstant; + if (millis < 2_000) { + return Long.MAX_VALUE; + } + + return (bytesRead / millis) * 1_000; + } + + return (bytesRead / (lastByteReadInstant - firstByteReadInstant)) * 1_000; + } + @Override public ProcessorState state() { return state; @@ -189,10 +231,31 @@ public ByteBuffer[] writeBuffers() { return null; } + @Override + public long writeThroughput() { + // Haven't written anything yet or not enough time has passed to calculated throughput (2s) + if (firstByteWroteInstant == -1 || bytesWritten == 0) { + return Long.MAX_VALUE; + } + + long millis = System.currentTimeMillis() - firstByteWroteInstant; + if (millis < 2_000) { + return Long.MAX_VALUE; + } + + // Always use `now` since this calculation is ongoing until the client reads all the bytes + return (bytesWritten / millis) * 1_000; + } + @Override public ProcessorState wrote(long num) { markUsed(); + bytesWritten += num; + if (firstByteWroteInstant == -1) { + firstByteWroteInstant = System.currentTimeMillis(); + } + if (num > 0) { logger.trace("(W)"); response.setCommitted(true); @@ -206,7 +269,7 @@ public ProcessorState wrote(long num) { // Flip back to reading and back to the preamble state, so we write the real response headers. Then start the worker thread and flip the ops requestProcessor.resetState(RequestState.Body); responseProcessor.resetState(ResponseState.Preamble); - threadPool.submit(new HTTPWorker(configuration.getHandler(), configuration.getLoggerFactory(), this, request, response)); + future = threadPool.submit(new HTTPWorker(configuration.getHandler(), configuration.getLoggerFactory(), this, request, response)); state = ProcessorState.Read; } else if (responseState == ResponseState.KeepAlive) { logger.trace("(WKA)"); diff --git a/src/main/java/io/fusionauth/http/server/HTTPProcessor.java b/src/main/java/io/fusionauth/http/server/HTTPProcessor.java index 41479f6..590f65c 100644 --- a/src/main/java/io/fusionauth/http/server/HTTPProcessor.java +++ b/src/main/java/io/fusionauth/http/server/HTTPProcessor.java @@ -73,6 +73,11 @@ public interface HTTPProcessor { */ ByteBuffer readBuffer() throws IOException; + /** + * @return The bytes per second throughput read by this processor. + */ + long readThroughput(); + /** * @return The current state of the HTTPProcessor. */ @@ -84,6 +89,11 @@ public interface HTTPProcessor { */ ByteBuffer[] writeBuffers() throws IOException; + /** + * @return The bytes per second throughput wrote by this processor. + */ + long writeThroughput(); + /** * Called by the selector to tell the HTTPProcessor that bytes were written back to the client. * diff --git a/src/main/java/io/fusionauth/http/server/HTTPResponseProcessor.java b/src/main/java/io/fusionauth/http/server/HTTPResponseProcessor.java index c067cc6..03e27fd 100644 --- a/src/main/java/io/fusionauth/http/server/HTTPResponseProcessor.java +++ b/src/main/java/io/fusionauth/http/server/HTTPResponseProcessor.java @@ -25,7 +25,7 @@ import io.fusionauth.http.body.response.ChunkedBodyProcessor; import io.fusionauth.http.body.response.ContentLengthBodyProcessor; import io.fusionauth.http.body.response.EmptyBodyProcessor; -import io.fusionauth.http.io.NonBlockingByteBufferOutputStream; +import io.fusionauth.http.io.BlockingByteBufferOutputStream; import io.fusionauth.http.log.Logger; import io.fusionauth.http.util.HTTPTools; @@ -39,7 +39,7 @@ public class HTTPResponseProcessor { private final Logger logger; - private final NonBlockingByteBufferOutputStream outputStream; + private final BlockingByteBufferOutputStream outputStream; private final HTTPRequest request; @@ -52,7 +52,7 @@ public class HTTPResponseProcessor { private volatile ResponseState state = ResponseState.Preamble; public HTTPResponseProcessor(HTTPServerConfiguration configuration, HTTPRequest request, HTTPResponse response, - NonBlockingByteBufferOutputStream outputStream) { + BlockingByteBufferOutputStream outputStream) { this.configuration = configuration; this.request = request; this.response = response; diff --git a/src/main/java/io/fusionauth/http/server/HTTPS11Processor.java b/src/main/java/io/fusionauth/http/server/HTTPS11Processor.java index f9fec7a..24db1e2 100644 --- a/src/main/java/io/fusionauth/http/server/HTTPS11Processor.java +++ b/src/main/java/io/fusionauth/http/server/HTTPS11Processor.java @@ -224,9 +224,12 @@ public ByteBuffer readBuffer() { } @Override - public ProcessorState state() { - delegate.markUsed(); + public long readThroughput() { + return delegate.readThroughput(); + } + @Override + public ProcessorState state() { if (engine == null) { return delegate.state(); } @@ -291,6 +294,11 @@ public ByteBuffer[] writeBuffers() throws IOException { return myNetData; } + @Override + public long writeThroughput() { + return delegate.writeThroughput(); + } + @Override public ProcessorState wrote(long num) throws IOException { delegate.markUsed(); diff --git a/src/main/java/io/fusionauth/http/server/HTTPServerConfiguration.java b/src/main/java/io/fusionauth/http/server/HTTPServerConfiguration.java index 65b21e7..06e83a1 100644 --- a/src/main/java/io/fusionauth/http/server/HTTPServerConfiguration.java +++ b/src/main/java/io/fusionauth/http/server/HTTPServerConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, FusionAuth, All Rights Reserved + * Copyright (c) 2022-2023, FusionAuth, All Rights Reserved * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,11 @@ import io.fusionauth.http.log.LoggerFactory; import io.fusionauth.http.log.SystemOutLoggerFactory; +/** + * The HTTP Server configuration. + * + * @author Brian Pontarelli + */ public class HTTPServerConfiguration implements Configurable { private final List listeners = new ArrayList<>(); @@ -45,6 +50,12 @@ public class HTTPServerConfiguration implements Configurable + * The maximum memory requirement for the output buffer can be calculated multiplying this value by the values returned from + * {@link HTTPServerConfiguration#getResponseBufferSize()} and * {@link HTTPServerConfiguration#getNumberOfWorkerThreads()}. + * + * @return the maximum output buffer queue length. + */ + public int getMaxOutputBufferQueueLength() { + return maxOutputBufferQueueLength; + } + + /** + * This configuration is the minimum number of bytes per second that a client must send a request to the server before the server closes + * the connection. + * + * @return The minimum throughput for any connection with the server in bytes per second. + */ + public long getMinimumReadThroughput() { + return minimumReadThroughput; + } + + /** + * This configuration is the minimum number of bytes per second that a client must read the response from the server before the server + * closes the connection. + * + * @return The minimum throughput for any connection with the server in bytes per second. + */ + public long getMinimumWriteThroughput() { + return minimumWriteThroughput; + } + public int getMultipartBufferSize() { return multipartBufferSize; } + /** + * The number of worker threads. This configuration will affect the runtime memory requirement. + *

+ * The maximum memory requirement for the output buffer can be calculated multiplying this value by the values returned from + * {@link HTTPServerConfiguration#getMaxOutputBufferQueueLength()} and * {@link HTTPServerConfiguration#getResponseBufferSize()}. + * + * @return the number of worker threads. + */ public int getNumberOfWorkerThreads() { return numberOfWorkerThreads; } @@ -117,6 +168,14 @@ public int getRequestBufferSize() { return requestBufferSize; } + /** + * The size of the response buffer in bytes. This configuration will affect the runtime memory requirement. + *

+ * The maximum memory requirement for the output buffer can be calculated multiplying this value by the values returned from + * {@link HTTPServerConfiguration#getMaxOutputBufferQueueLength()} and * {@link HTTPServerConfiguration#getNumberOfWorkerThreads()}. + * + * @return the response buffer size in bytes. + */ public int getResponseBufferSize() { return responseBufferSize; } @@ -145,7 +204,7 @@ public HTTPServerConfiguration withBaseDir(Path baseDir) { public HTTPServerConfiguration withClientTimeout(Duration duration) { Objects.requireNonNull(duration, "You cannot set the client timeout to null"); if (duration.isZero() || duration.isNegative()) { - throw new IllegalArgumentException("You cannot set the client timeout less than 0"); + throw new IllegalArgumentException("The client timeout duration must be greater than 0"); } @@ -219,26 +278,72 @@ public HTTPServerConfiguration withLoggerFactory(LoggerFactory loggerFactory) { return this; } + /** + * {@inheritDoc} + */ + @Override + public HTTPServerConfiguration withMaxOutputBufferQueueLength(int outputBufferQueueLength) { + if (outputBufferQueueLength < 16) { + throw new IllegalArgumentException("The maximum output buffer queue length must be greater than or equal to 16"); + } + + this.maxOutputBufferQueueLength = outputBufferQueueLength; + return this; + } + /** * {@inheritDoc} */ @Override public HTTPServerConfiguration withMaxPreambleLength(int maxLength) { if (maxLength <= 0) { - throw new IllegalArgumentException("You cannot set the max preamble length than 0"); + throw new IllegalArgumentException("The maximum preamble length must be greater than 0"); } this.maxHeadLength = maxLength; return this; } + /** + * This configures the minimum number of bytes per second that a client must send a request to the server before the server closes the + * connection. + * + * @param bytesPerSecond The bytes per second throughput. + * @return This. + */ + @Override + public HTTPServerConfiguration withMinimumReadThroughput(long bytesPerSecond) { + if (bytesPerSecond < 1024) { + throw new IllegalArgumentException("This should probably be faster than a 28.8 baud modem!"); + } + + this.minimumReadThroughput = bytesPerSecond; + return this; + } + + /** + * This configures the minimum number of bytes per second that a client must read the response from the server before the server closes + * the connection. + * + * @param bytesPerSecond The bytes per second throughput. + * @return This. + */ + public HTTPServerConfiguration withMinimumWriteThroughput(long bytesPerSecond) { + if (bytesPerSecond < 1024) { + throw new IllegalArgumentException("This should probably be faster than a 28.8 baud modem!"); + } + + this.minimumWriteThroughput = bytesPerSecond; + return this; + } + /** * {@inheritDoc} */ @Override public HTTPServerConfiguration withMultipartBufferSize(int multipartBufferSize) { if (multipartBufferSize <= 0) { - throw new IllegalArgumentException("You cannot set the multipart buffer size less than 0"); + throw new IllegalArgumentException("The multi-part buffer size must be greater than 0"); } this.multipartBufferSize = multipartBufferSize; @@ -251,7 +356,7 @@ public HTTPServerConfiguration withMultipartBufferSize(int multipartBufferSize) @Override public HTTPServerConfiguration withNumberOfWorkerThreads(int numberOfWorkerThreads) { if (numberOfWorkerThreads <= 0) { - throw new IllegalArgumentException("You cannot set the number of worker threads less than 0"); + throw new IllegalArgumentException("The number of worker threads must be greater than 0"); } this.numberOfWorkerThreads = numberOfWorkerThreads; @@ -264,7 +369,7 @@ public HTTPServerConfiguration withNumberOfWorkerThreads(int numberOfWorkerThrea @Override public HTTPServerConfiguration withPreambleBufferSize(int size) { if (size <= 0) { - throw new IllegalArgumentException("You cannot set the preamble buffer size less than 0"); + throw new IllegalArgumentException("The preamble buffer size must be greater than 0"); } this.preambleBufferSize = size; @@ -277,7 +382,7 @@ public HTTPServerConfiguration withPreambleBufferSize(int size) { @Override public HTTPServerConfiguration withRequestBufferSize(int requestBufferSize) { if (requestBufferSize <= 0) { - throw new IllegalArgumentException("You cannot set the request buffer size less than 0"); + throw new IllegalArgumentException("The request buffer size must be greater than 0"); } this.requestBufferSize = requestBufferSize; @@ -290,7 +395,7 @@ public HTTPServerConfiguration withRequestBufferSize(int requestBufferSize) { @Override public HTTPServerConfiguration withResponseBufferSize(int responseBufferSize) { if (responseBufferSize <= 0) { - throw new IllegalArgumentException("You cannot set the response buffer size less than 0"); + throw new IllegalArgumentException("The response buffer size must be greater than 0"); } this.responseBufferSize = responseBufferSize; @@ -305,7 +410,7 @@ public HTTPServerConfiguration withShutdownDuration(Duration duration) { Objects.requireNonNull(duration, "You cannot set the shutdown duration to null"); if (duration.isZero() || duration.isNegative()) { - throw new IllegalArgumentException("You cannot set the shutdown duration less than 0"); + throw new IllegalArgumentException("The shutdown duration must be grater than 0"); } this.shutdownDuration = duration; diff --git a/src/main/java/io/fusionauth/http/server/HTTPServerThread.java b/src/main/java/io/fusionauth/http/server/HTTPServerThread.java index 25123b7..e5c0d3f 100644 --- a/src/main/java/io/fusionauth/http/server/HTTPServerThread.java +++ b/src/main/java/io/fusionauth/http/server/HTTPServerThread.java @@ -54,6 +54,10 @@ public class HTTPServerThread extends Thread implements Closeable, Notifier { private final Logger logger; + private final long minimumReadThroughput; + + private final long minimumWriteThroughput; + private final ByteBuffer preambleBuffer; private final Selector selector; @@ -70,6 +74,8 @@ public HTTPServerThread(HTTPServerConfiguration configuration, HTTPListenerConfi this.listenerConfiguration = listenerConfiguration; this.instrumenter = configuration.getInstrumenter(); this.logger = configuration.getLoggerFactory().getLogger(HTTPServerThread.class); + this.minimumReadThroughput = configuration.getMinimumReadThroughput(); + this.minimumWriteThroughput = configuration.getMinimumWriteThroughput(); this.preambleBuffer = ByteBuffer.allocate(configuration.getPreambleBufferSize()); this.selector = Selector.open(); this.threadPool = threadPool; @@ -235,6 +241,11 @@ private void cancelAndCloseKey(SelectionKey key) { logger.debug("Closing connection to client [{}]", socketChannel.getRemoteAddress().toString()); } + // Close the processor, which should kill the thread + if (key.attachment() != null) { + ((HTTPProcessor) key.attachment()).close(false); + } + key.cancel(); if (client.validOps() != SelectionKey.OP_ACCEPT && instrumenter != null) { @@ -251,33 +262,43 @@ private void cancelAndCloseKey(SelectionKey key) { @SuppressWarnings("resource") private void cleanup() { long now = System.currentTimeMillis(); - selector.keys() - .stream() - .filter(key -> key.attachment() != null) - .filter(key -> ((HTTPProcessor) key.attachment()).lastUsed() < now - clientTimeout.toMillis()) - .forEach(key -> { - if (logger.isDebugEnabled()) { - var client = (SocketChannel) key.channel(); - try { - logger.debug("Closing client connection [{}] due to inactivity", client.getRemoteAddress().toString()); - - StringBuilder threadDump = new StringBuilder(); - for (Map.Entry entry : Thread.getAllStackTraces().entrySet()) { - threadDump.append(entry.getKey()).append(" ").append(entry.getKey().getState()).append("\n"); - for (StackTraceElement ste : entry.getValue()) { - threadDump.append("\tat ").append(ste).append("\n"); - } - threadDump.append("\n"); - } - - logger.debug("Thread dump from server side.\n" + threadDump); - } catch (IOException e) { - // Ignore because we are just debugging - } - } - - cancelAndCloseKey(key); - }); + for (SelectionKey key : selector.keys()) { + if (key.attachment() == null) { + continue; + } + + var processor = (HTTPProcessor) key.attachment(); + boolean badChannel = + (processor.state() == ProcessorState.Read && processor.readThroughput() < minimumReadThroughput) || // Not reading fast enough + (processor.state() == ProcessorState.Write && processor.writeThroughput() < minimumWriteThroughput) || // Not writing fast enough + (processor.lastUsed() < now - clientTimeout.toMillis()); // Timed out + + if (!badChannel) { + continue; + } + + if (logger.isDebugEnabled()) { + var client = (SocketChannel) key.channel(); + try { + logger.debug("Closing client connection [{}] due to inactivity", client.getRemoteAddress().toString()); + + StringBuilder threadDump = new StringBuilder(); + for (Map.Entry entry : Thread.getAllStackTraces().entrySet()) { + threadDump.append(entry.getKey()).append(" ").append(entry.getKey().getState()).append("\n"); + for (StackTraceElement ste : entry.getValue()) { + threadDump.append("\tat ").append(ste).append("\n"); + } + threadDump.append("\n"); + } + + logger.debug("Thread dump from server side.\n" + threadDump); + } catch (IOException e) { + // Ignore because we are just debugging + } + } + + cancelAndCloseKey(key); + } } private String ipAddress(SocketChannel client) throws IOException { diff --git a/src/test/java/io/fusionauth/http/BaseTest.java b/src/test/java/io/fusionauth/http/BaseTest.java index c97a6de..cae8c3d 100644 --- a/src/test/java/io/fusionauth/http/BaseTest.java +++ b/src/test/java/io/fusionauth/http/BaseTest.java @@ -164,6 +164,8 @@ public HTTPServer makeServer(String scheme, HTTPHandler handler, Instrumenter in .withExpectValidator(expectValidator) .withInstrumenter(instrumenter) .withLoggerFactory(AccumulatingLoggerFactory.FACTORY) + .withMinimumReadThroughput(200 * 1024) + .withMinimumWriteThroughput(200 * 1024) .withNumberOfWorkerThreads(1) .withListener(listenerConfiguration); } @@ -358,7 +360,7 @@ public void onTestFailure(ITestResult result) { HTTP Trace: {{trace}} - ----------------- + ----------------- """.replace("{{exception}}", throwable != null ? throwable.getClass().getSimpleName() : "-") .replace("{{message}}", throwable != null ? (throwable.getMessage() != null ? throwable.getMessage() : "-") : "-") .replace("{{trace}}", trace)); diff --git a/src/test/java/io/fusionauth/http/CoreTest.java b/src/test/java/io/fusionauth/http/CoreTest.java index 11ccca2..184e530 100644 --- a/src/test/java/io/fusionauth/http/CoreTest.java +++ b/src/test/java/io/fusionauth/http/CoreTest.java @@ -26,10 +26,13 @@ import java.net.http.HttpRequest.BodyPublishers; import java.net.http.HttpResponse.BodySubscribers; import java.nio.charset.StandardCharsets; +import java.security.SecureRandom; import java.security.cert.Certificate; import java.time.Duration; +import java.util.Base64; import java.util.List; import java.util.Locale; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.InflaterInputStream; import com.inversoft.net.ssl.SSLTools; @@ -250,7 +253,7 @@ public void hugeHeaders(String scheme) throws Exception { // 260 characters for a total of 16,640 bytes per header value. 5 headers for a total of 83,200 bytes HTTPHandler handler = (req, res) -> { res.setHeader(Headers.ContentType, "text/plain"); - res.setHeader("Content-Length", "16"); + res.setHeader(Headers.ContentLength, "16"); res.setHeader("X-Huge-Header-1", LongString); res.setHeader("X-Huge-Header-2", LongString); res.setHeader("X-Huge-Header-3", LongString); @@ -302,7 +305,7 @@ public void logger() { public void performance(String scheme) throws Exception { HTTPHandler handler = (req, res) -> { res.setHeader(Headers.ContentType, "text/plain"); - res.setHeader("Content-Length", "16"); + res.setHeader(Headers.ContentLength, "16"); res.setStatus(200); try { @@ -346,7 +349,7 @@ public void performance(String scheme) throws Exception { public void performanceNoKeepAlive(String scheme) throws Exception { HTTPHandler handler = (req, res) -> { res.setHeader(Headers.ContentType, "text/plain"); - res.setHeader("Content-Length", "16"); + res.setHeader(Headers.ContentLength, "16"); res.setStatus(200); try { @@ -392,7 +395,7 @@ public void performanceNoKeepAlive(String scheme) throws Exception { public void serverClosesSockets(String scheme) { HTTPHandler handler = (req, res) -> { res.setHeader(Headers.ContentType, "text/plain"); - res.setHeader("Content-Length", "16"); + res.setHeader(Headers.ContentLength, "16"); res.setStatus(200); try { @@ -498,7 +501,7 @@ public void simpleGet(String scheme) throws Exception { public void simpleGetMultiplePorts() throws Exception { HTTPHandler handler = (req, res) -> { res.setHeader(Headers.ContentType, "text/plain"); - res.setHeader("Content-Length", "16"); + res.setHeader(Headers.ContentLength, "16"); res.setStatus(200); try { @@ -562,7 +565,7 @@ public void simplePost(String scheme) throws Exception { System.out.println("Done"); res.setHeader(Headers.ContentType, "text/plain"); - res.setHeader("Content-Length", "16"); + res.setHeader(Headers.ContentLength, "16"); res.setStatus(200); try { @@ -588,6 +591,61 @@ public void simplePost(String scheme) throws Exception { } } + @Test(dataProvider = "schemes") + public void slowWrites(String scheme) throws Exception { + // Test a slow connection where the HTTP server is blocked because we cannot write to the output stream as fast as we'd like + + // The default test config will use a 16k buffer and a queue size of 16. + // - Trying to write 8 MB slowly should cause an error. + + // 8 MB bytes + byte[] bytes = new byte[1024 * 1024 * 8]; + new SecureRandom().nextBytes(bytes); + + // Base64 encoded large string + String largeRequest = Base64.getEncoder().encodeToString(bytes); + + HTTPHandler handler = (req, res) -> { + res.setHeader(Headers.ContentType, "text/plain; charset=UTF-8"); + int contentLength = largeRequest.getBytes(StandardCharsets.UTF_8).length; + System.out.println("set Content-Length: " + contentLength); + res.setHeader(Headers.ContentLength, String.valueOf(contentLength)); + res.setStatus(200); + + Writer writer = res.getWriter(); + writer.write(largeRequest); + writer.close(); + }; + + AtomicBoolean slept = new AtomicBoolean(false); + try (HTTPServer ignore = makeServer(scheme, handler).start()) { + URI uri = makeURI(scheme, ""); + var client = makeClient(scheme, null); + client.send( + HttpRequest.newBuilder().uri(uri).GET().build(), + r -> BodySubscribers.ofByteArrayConsumer(optional -> { + byte[] actual = optional.orElse(null); + if (actual != null) { + // Sleep once only since the server should fail after the first batch, but since Java or the OS might cache a lot of bytes it + // read from the socket, we can't sleep for too long, otherwise, this test will never complete + if (!slept.get()) { + int sleep = 5 * 1_000; + System.out.println("received [" + actual.length + "] bytes. Sleep [" + sleep + "]"); + sleep(sleep); // We expect to only wait for 2,000 ms + slept.set(true); + } + } else { + System.out.println("no bytes"); + } + }) + ); + + fail("Should have thrown"); + } catch (IOException e) { + // Expected + } + } + @Test(dataProvider = "schemes") public void statusOnly(String scheme) throws Exception { HTTPHandler handler = (req, res) -> res.setStatus(200); @@ -637,7 +695,7 @@ public void writer(String scheme) throws Exception { req.getInputStream().readAllBytes(); res.setHeader(Headers.ContentType, "text/plain; charset=UTF-16"); - res.setHeader("Content-Length", "" + ExpectedResponse.getBytes(StandardCharsets.UTF_16).length); // Recalculate the byte length using UTF-16 + res.setHeader(Headers.ContentLength, String.valueOf(ExpectedResponse.getBytes(StandardCharsets.UTF_16).length)); // Recalculate the byte length using UTF-16 res.setStatus(200); Writer writer = res.getWriter();