Skip to content

Commit

Permalink
Changed the non-blocking queue to block so that large responses with …
Browse files Browse the repository at this point in the history
…lots of clients doesn't cause OOMEs. This also required adding in additional failure cases when clients are reading or writing bytes too slowly with the server.
  • Loading branch information
voidmain committed Aug 29, 2023
1 parent 39c47bd commit bba43e5
Show file tree
Hide file tree
Showing 13 changed files with 395 additions and 75 deletions.
2 changes: 1 addition & 1 deletion build.savant
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jackson5Version = "3.0.1"
restifyVersion = "4.2.0"
testngVersion = "7.8.0"

project(group: "io.fusionauth", name: "java-http", version: "0.2.2", licenses: ["ApacheV2_0"]) {
project(group: "io.fusionauth", name: "java-http", version: "0.2.3", licenses: ["ApacheV2_0"]) {
workflow {
fetch {
// Dependency resolution order:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, FusionAuth, All Rights Reserved
* Copyright (c) 2022-2023, FusionAuth, All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,7 @@
import java.nio.ByteBuffer;

import io.fusionauth.http.HTTPValues.ControlBytes;
import io.fusionauth.http.io.NonBlockingByteBufferOutputStream;
import io.fusionauth.http.io.BlockingByteBufferOutputStream;

/**
* A body processor that handles chunked requests/responses.
Expand All @@ -32,9 +32,9 @@ public class ChunkedBodyProcessor implements BodyProcessor {

private final ByteBuffer[] currentBuffers = new ByteBuffer[3];

private final NonBlockingByteBufferOutputStream outputStream;
private final BlockingByteBufferOutputStream outputStream;

public ChunkedBodyProcessor(NonBlockingByteBufferOutputStream outputStream) {
public ChunkedBodyProcessor(BlockingByteBufferOutputStream outputStream) {
this.outputStream = outputStream;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, FusionAuth, All Rights Reserved
* Copyright (c) 2022-2023, FusionAuth, All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,7 @@

import java.nio.ByteBuffer;

import io.fusionauth.http.io.NonBlockingByteBufferOutputStream;
import io.fusionauth.http.io.BlockingByteBufferOutputStream;

/**
* A body processor that uses the Content-Length header to determine when the entire body has been read.
Expand All @@ -27,9 +27,9 @@
public class ContentLengthBodyProcessor implements BodyProcessor {
private final ByteBuffer[] currentBuffers = new ByteBuffer[1];

private final NonBlockingByteBufferOutputStream outputStream;
private final BlockingByteBufferOutputStream outputStream;

public ContentLengthBodyProcessor(NonBlockingByteBufferOutputStream outputStream) {
public ContentLengthBodyProcessor(BlockingByteBufferOutputStream outputStream) {
this.outputStream = outputStream;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, FusionAuth, All Rights Reserved
* Copyright (c) 2022-2023, FusionAuth, All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,24 +17,25 @@

import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import io.fusionauth.http.server.Notifier;

/**
* This InputStream uses ByteBuffers read by the Server processor and piped into this class. Those ByteBuffers are then fed to the reader of
* the InputStream. The pushing of ByteBuffers never blocks but the reading will block if there are no more bytes to read.
* the InputStream. The pushing of ByteBuffers blocks based on the configuration and the reading will block if there are no more bytes to
* read.
* <p>
* In order to handle chunking and other considerations, this class can be sub-classed to preprocess bytes.
*
* @author Brian Pontarelli
*/
public class NonBlockingByteBufferOutputStream extends OutputStream {
public class BlockingByteBufferOutputStream extends OutputStream {
private final int bufferSize;

// Shared between writer and reader threads. No one blocks using this.
private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
// Shared between writer and reader threads.
private final BlockingQueue<ByteBuffer> buffers;

private final Notifier notifier;

Expand All @@ -45,9 +46,10 @@ public class NonBlockingByteBufferOutputStream extends OutputStream {

private volatile boolean used;

public NonBlockingByteBufferOutputStream(Notifier notifier, int bufferSize) {
this.notifier = notifier;
public BlockingByteBufferOutputStream(Notifier notifier, int bufferSize, int maximumQueueSize) {
this.buffers = new LinkedBlockingQueue<>(maximumQueueSize);
this.bufferSize = bufferSize;
this.notifier = notifier;
}

public void clear() {
Expand Down Expand Up @@ -149,8 +151,13 @@ public void write(byte[] b, int off, int len) {

private void addBuffer(boolean notify) {
currentBuffer.flip();
if (!buffers.offer(currentBuffer)) {
throw new IllegalStateException("The ConcurrentLinkedQueue is borked. It should never reject an offer() operation.");

try {
// This will block until we have capacity. We looked and it seems as though there aren't any ways that a Worker thread can be in a
// state where it was interrupted by the HTTPServerThread and this line of code doesn't throw an InterruptedException.
buffers.put(currentBuffer);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}

currentBuffer = null;
Expand Down
48 changes: 47 additions & 1 deletion src/main/java/io/fusionauth/http/server/Configurable.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, FusionAuth, All Rights Reserved
* Copyright (c) 2022-2023, FusionAuth, All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -140,6 +140,20 @@ default T withLoggerFactory(LoggerFactory loggerFactory) {
return (T) this;
}

/**
* Sets the maximum length of the output buffer queue. Defaults to 128.
* <p>
* This parameter will affect the runtime memory requirement of the server. This can be calculated by multiplying this value by values
* returned from {@link HTTPServerConfiguration#getResponseBufferSize()} and {@link HTTPServerConfiguration#getNumberOfWorkerThreads()}.
*
* @param outputBufferQueueLength The length of the output buffer queue.
* @return This.
*/
default T withMaxOutputBufferQueueLength(int outputBufferQueueLength) {
configuration().withMaxOutputBufferQueueLength(outputBufferQueueLength);
return (T) this;
}

/**
* Sets the max preamble length (the start-line and headers constitute the head). Defaults to 64k
*
Expand All @@ -151,6 +165,30 @@ default T withMaxPreambleLength(int maxLength) {
return (T) this;
}

/**
* This configures the minimum number of bytes per second that a client must send a request to the server before the server closes the
* connection.
*
* @param bytesPerSecond The bytes per second throughput.
* @return This.
*/
default T withMinimumReadThroughput(long bytesPerSecond) {
configuration().withMinimumReadThroughput(bytesPerSecond);
return (T) this;
}

/**
* This configures the minimum number of bytes per second that a client must read the response from the server before the server closes
* the connection.
*
* @param bytesPerSecond The bytes per second throughput.
* @return This.
*/
default T withMinimumWriteThroughput(long bytesPerSecond) {
configuration().withMinimumWriteThroughput(bytesPerSecond);
return (T) this;
}

/**
* Sets the size of the buffer that is used to process the multipart request body. This defaults to 16k.
*
Expand All @@ -164,6 +202,10 @@ default T withMultipartBufferSize(int multipartBufferSize) {

/**
* Sets the number of worker threads that will handle requests coming into the HTTP server. Defaults to 40.
* <p>
* This parameter will affect the runtime memory requirement of the server. This can be calculated by multiplying this value by the
* returned from {@link HTTPServerConfiguration#getResponseBufferSize()} and
* {@link HTTPServerConfiguration#getMaxOutputBufferQueueLength()}.
*
* @param numberOfWorkerThreads The number of worker threads.
* @return This.
Expand Down Expand Up @@ -197,6 +239,10 @@ default T withRequestBufferSize(int requestBufferSize) {

/**
* Sets the size of the buffer that is used to process the HTTP response. This defaults to 16k.
* <p>
* This parameter will affect the runtime memory requirement of the server. This can be calculated by multiplying this value by the
* returned from {@link HTTPServerConfiguration#getNumberOfWorkerThreads()} and
* {@link HTTPServerConfiguration#getMaxOutputBufferQueueLength()}.
*
* @param responseBufferSize The size of the buffer.
* @return This.
Expand Down
73 changes: 68 additions & 5 deletions src/main/java/io/fusionauth/http/server/HTTP11Processor.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, FusionAuth, All Rights Reserved
* Copyright (c) 2022-2023, FusionAuth, All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,9 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.concurrent.Future;

import io.fusionauth.http.io.NonBlockingByteBufferOutputStream;
import io.fusionauth.http.io.BlockingByteBufferOutputStream;
import io.fusionauth.http.log.Logger;
import io.fusionauth.http.util.ThreadPool;

Expand Down Expand Up @@ -47,6 +48,18 @@ public class HTTP11Processor implements HTTPProcessor {

private final ThreadPool threadPool;

private long bytesRead;

private long bytesWritten;

private long firstByteReadInstant = -1;

private long firstByteWroteInstant = -1;

private Future<?> future;

private long lastByteReadInstant = -1;

private long lastUsed = System.currentTimeMillis();

private volatile ProcessorState state;
Expand All @@ -63,7 +76,7 @@ public HTTP11Processor(HTTPServerConfiguration configuration, HTTPListenerConfig
this.request = new HTTPRequest(configuration.getContextPath(), configuration.getMultipartBufferSize(), listener.isTLS() ? "https" : "http", listener.getPort(), ipAddress);
this.requestProcessor = new HTTPRequestProcessor(configuration, request);

NonBlockingByteBufferOutputStream outputStream = new NonBlockingByteBufferOutputStream(notifier, configuration.getResponseBufferSize());
BlockingByteBufferOutputStream outputStream = new BlockingByteBufferOutputStream(notifier, configuration.getResponseBufferSize(), configuration.getMaxOutputBufferQueueLength());
this.response = new HTTPResponse(outputStream, request, configuration.isCompressByDefault());
this.responseProcessor = new HTTPResponseProcessor(configuration, request, response, outputStream);
}
Expand All @@ -72,6 +85,9 @@ public HTTP11Processor(HTTPServerConfiguration configuration, HTTPListenerConfig
public ProcessorState close(boolean endOfStream) {
logger.trace("(C)");

// Interrupt the thread if it is still running
future.cancel(true);

// Set the state to Close and return it
state = ProcessorState.Close;
return state;
Expand Down Expand Up @@ -119,6 +135,13 @@ public void markUsed() {
public ProcessorState read(ByteBuffer buffer) throws IOException {
markUsed();

bytesRead += buffer.remaining();
if (firstByteReadInstant == -1) {
lastByteReadInstant = firstByteReadInstant = System.currentTimeMillis();
} else {
lastByteReadInstant = System.currentTimeMillis();
}

logger.trace("(R)");

RequestState requestState = requestProcessor.state();
Expand All @@ -130,7 +153,7 @@ public ProcessorState read(ByteBuffer buffer) throws IOException {
// If the next state is not preamble, that means we are done processing that and ready to handle the request in a separate thread
if (requestState != RequestState.Preamble && requestState != RequestState.Expect) {
logger.trace("(RWo)");
threadPool.submit(new HTTPWorker(configuration.getHandler(), configuration.getLoggerFactory(), this, request, response));
future = threadPool.submit(new HTTPWorker(configuration.getHandler(), configuration.getLoggerFactory(), this, request, response));
}
} else {
logger.trace("(RB)");
Expand Down Expand Up @@ -174,6 +197,25 @@ public ByteBuffer readBuffer() {
return buffer;
}

@Override
public long readThroughput() {
// Haven't read anything yet, or we read everything in the first read (instants are equal)
if (firstByteReadInstant == -1 || bytesRead == 0 || lastByteReadInstant == firstByteReadInstant) {
return Long.MAX_VALUE;
}

if (firstByteWroteInstant == -1) {
long millis = System.currentTimeMillis() - firstByteReadInstant;
if (millis < 2_000) {
return Long.MAX_VALUE;
}

return (bytesRead / millis) * 1_000;
}

return (bytesRead / (lastByteReadInstant - firstByteReadInstant)) * 1_000;
}

@Override
public ProcessorState state() {
return state;
Expand All @@ -189,10 +231,31 @@ public ByteBuffer[] writeBuffers() {
return null;
}

@Override
public long writeThroughput() {
// Haven't written anything yet or not enough time has passed to calculated throughput (2s)
if (firstByteWroteInstant == -1 || bytesWritten == 0) {
return Long.MAX_VALUE;
}

long millis = System.currentTimeMillis() - firstByteWroteInstant;
if (millis < 2_000) {
return Long.MAX_VALUE;
}

// Always use `now` since this calculation is ongoing until the client reads all the bytes
return (bytesWritten / millis) * 1_000;
}

@Override
public ProcessorState wrote(long num) {
markUsed();

bytesWritten += num;
if (firstByteWroteInstant == -1) {
firstByteWroteInstant = System.currentTimeMillis();
}

if (num > 0) {
logger.trace("(W)");
response.setCommitted(true);
Expand All @@ -206,7 +269,7 @@ public ProcessorState wrote(long num) {
// Flip back to reading and back to the preamble state, so we write the real response headers. Then start the worker thread and flip the ops
requestProcessor.resetState(RequestState.Body);
responseProcessor.resetState(ResponseState.Preamble);
threadPool.submit(new HTTPWorker(configuration.getHandler(), configuration.getLoggerFactory(), this, request, response));
future = threadPool.submit(new HTTPWorker(configuration.getHandler(), configuration.getLoggerFactory(), this, request, response));
state = ProcessorState.Read;
} else if (responseState == ResponseState.KeepAlive) {
logger.trace("(WKA)");
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/fusionauth/http/server/HTTPProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public interface HTTPProcessor {
*/
ByteBuffer readBuffer() throws IOException;

/**
* @return The bytes per second throughput read by this processor.
*/
long readThroughput();

/**
* @return The current state of the HTTPProcessor.
*/
Expand All @@ -84,6 +89,11 @@ public interface HTTPProcessor {
*/
ByteBuffer[] writeBuffers() throws IOException;

/**
* @return The bytes per second throughput wrote by this processor.
*/
long writeThroughput();

/**
* Called by the selector to tell the HTTPProcessor that bytes were written back to the client.
*
Expand Down
Loading

0 comments on commit bba43e5

Please sign in to comment.