diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java index 42d90fbb86d5..eb7442c62299 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java @@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.CyclicTimeouts; @@ -74,6 +76,7 @@ public class QoSHandler extends ConditionalHandler.Abstract private static final Logger LOG = LoggerFactory.getLogger(QoSHandler.class); private static final String EXPIRED_ATTRIBUTE_NAME = QoSHandler.class.getName() + ".expired"; + private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final AtomicInteger state = new AtomicInteger(); private final Map> queues = new ConcurrentHashMap<>(); private final Set priorities = new ConcurrentSkipListSet<>(Comparator.reverseOrder()); @@ -181,35 +184,55 @@ protected void doStop() throws Exception @Override public boolean onConditionsMet(Request request, Response response, Callback callback) throws Exception + { + return process(request, response, callback); + } + + private boolean process(Request request, Response response, Callback callback) throws Exception { if (LOG.isDebugEnabled()) - LOG.debug("{} handling {}", this, request); + LOG.debug("{} processing {}", this, request); - int permits = state.getAndDecrement(); - if (permits > 0) - { - return handleWithPermit(request, response, callback); - } - else + boolean expired = false; + + // The read lock allows concurrency with resume(), + // which is the common case, but not with expire(). + lock.readLock().lock(); + try { - if (request.getAttribute(EXPIRED_ATTRIBUTE_NAME) != null) - { - // This is a request that was suspended, and it expired. - // Do not suspend it again, just complete it with 503. - state.getAndIncrement(); - notAvailable(response, callback); - } - else + int permits = state.decrementAndGet(); + if (permits < 0) { - // Avoid this race condition: - // T1 in handle() may find no permits, so it will suspend the request. - // T2 in resume() finds no suspended requests and increments the permits. - // T1 suspends the request, which will remain suspended despite permits are available. - // See correspondent state machine logic in resume() and expire(). - suspend(request, response, callback); + if (request.getAttribute(EXPIRED_ATTRIBUTE_NAME) == null) + { + // Cover this race condition: + // T1 in this method may find no permits, so it will suspend the request. + // T2 in resume() finds no suspended request yet and increments the permits. + // T1 suspends the request, despite permits are available. + // This is avoided in resume() using a spin loop to wait for the request to be suspended. + // See correspondent state machine logic in resume() and expire(). + suspend(request, response, callback); + return true; + } + else + { + // This is a request that was suspended, it expired, and was re-handled. + // Do not suspend it again, just complete it with 503 unavailable. + state.incrementAndGet(); + expired = true; + } } - return true; } + finally + { + lock.readLock().unlock(); + } + + if (!expired) + return handleWithPermit(request, response, callback); + + notAvailable(response, callback); + return true; } @Override @@ -287,24 +310,33 @@ private void suspend(Request request, Response response, Callback callback) private void resume(Throwable x) { - // See correspondent state machine logic in handle() and expire(). - int permits = state.getAndIncrement(); - if (permits >= 0) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} no suspended requests to resume", this, x); - return; - } - - while (true) + // Allows concurrency with process(), but not with expire(). + lock.readLock().lock(); + try { - if (resumeSuspended()) + // See correspondent state machine logic in process() and expire(). + int permits = state.incrementAndGet(); + if (permits > 0) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} no suspended requests to resume", this, x); return; + } - // Found no suspended requests yet, but there will be. - // This covers the small race window in handle(), where - // the state is updated and then the request suspended. - Thread.onSpinWait(); + while (true) + { + if (resumeSuspended()) + return; + + // Found no suspended requests yet, but there will be. + // This covers the small race window in process(), where + // the state is updated and then the request suspended. + Thread.onSpinWait(); + } + } + finally + { + lock.readLock().unlock(); } } @@ -357,17 +389,32 @@ public long getExpireNanoTime() private void expire() { - // The request timed out, therefore it never acquired a permit. - boolean removed = queues.get(priority).remove(this); - if (removed) + boolean removed; + // It should be rare that requests expire. + // Grab the write lock to atomically operate on the queue and + // the state, avoiding concurrency with process() and resume(). + lock.writeLock().lock(); + try { - // See correspondent state machine logic in handle() and resume(). - state.getAndIncrement(); - if (LOG.isDebugEnabled()) - LOG.debug("{} timeout {}", QoSHandler.this, request); - request.setAttribute(EXPIRED_ATTRIBUTE_NAME, true); - failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException()); + // The request timed out, therefore it was not handled. + removed = queues.get(priority).remove(this); + // The remove() may fail to a concurrent resume(). + if (removed) + { + // See correspondent state machine logic in process() and resume(). + state.incrementAndGet(); + if (LOG.isDebugEnabled()) + LOG.debug("{} timeout {}", QoSHandler.this, request); + request.setAttribute(EXPIRED_ATTRIBUTE_NAME, true); + } } + finally + { + lock.writeLock().unlock(); + } + + if (removed) + failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException()); } @Override