Skip to content

Commit

Permalink
Fixes #11763 - Race condition in QoSHandler. (#11772)
Browse files Browse the repository at this point in the history
* Fixes #11763 - Race condition in QoSHandler.

Now using a read-write lock to atomically execute expire().
This guarantees that there are no races with resume().

The concurrency between handle() and resume(), which should be the most common case, is handled by atomic data structures.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet authored May 14, 2024
1 parent 33bc4f7 commit a9b2da5
Showing 1 changed file with 93 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.CyclicTimeouts;
Expand Down Expand Up @@ -74,6 +76,7 @@ public class QoSHandler extends ConditionalHandler.Abstract
private static final Logger LOG = LoggerFactory.getLogger(QoSHandler.class);
private static final String EXPIRED_ATTRIBUTE_NAME = QoSHandler.class.getName() + ".expired";

private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final AtomicInteger state = new AtomicInteger();
private final Map<Integer, Queue<Entry>> queues = new ConcurrentHashMap<>();
private final Set<Integer> priorities = new ConcurrentSkipListSet<>(Comparator.reverseOrder());
Expand Down Expand Up @@ -181,35 +184,55 @@ protected void doStop() throws Exception

@Override
public boolean onConditionsMet(Request request, Response response, Callback callback) throws Exception
{
return process(request, response, callback);
}

private boolean process(Request request, Response response, Callback callback) throws Exception
{
if (LOG.isDebugEnabled())
LOG.debug("{} handling {}", this, request);
LOG.debug("{} processing {}", this, request);

int permits = state.getAndDecrement();
if (permits > 0)
{
return handleWithPermit(request, response, callback);
}
else
boolean expired = false;

// The read lock allows concurrency with resume(),
// which is the common case, but not with expire().
lock.readLock().lock();
try
{
if (request.getAttribute(EXPIRED_ATTRIBUTE_NAME) != null)
{
// This is a request that was suspended, and it expired.
// Do not suspend it again, just complete it with 503.
state.getAndIncrement();
notAvailable(response, callback);
}
else
int permits = state.decrementAndGet();
if (permits < 0)
{
// Avoid this race condition:
// T1 in handle() may find no permits, so it will suspend the request.
// T2 in resume() finds no suspended requests and increments the permits.
// T1 suspends the request, which will remain suspended despite permits are available.
// See correspondent state machine logic in resume() and expire().
suspend(request, response, callback);
if (request.getAttribute(EXPIRED_ATTRIBUTE_NAME) == null)
{
// Cover this race condition:
// T1 in this method may find no permits, so it will suspend the request.
// T2 in resume() finds no suspended request yet and increments the permits.
// T1 suspends the request, despite permits are available.
// This is avoided in resume() using a spin loop to wait for the request to be suspended.
// See correspondent state machine logic in resume() and expire().
suspend(request, response, callback);
return true;
}
else
{
// This is a request that was suspended, it expired, and was re-handled.
// Do not suspend it again, just complete it with 503 unavailable.
state.incrementAndGet();
expired = true;
}
}
return true;
}
finally
{
lock.readLock().unlock();
}

if (!expired)
return handleWithPermit(request, response, callback);

notAvailable(response, callback);
return true;
}

@Override
Expand Down Expand Up @@ -287,24 +310,33 @@ private void suspend(Request request, Response response, Callback callback)

private void resume(Throwable x)
{
// See correspondent state machine logic in handle() and expire().
int permits = state.getAndIncrement();
if (permits >= 0)
{
if (LOG.isDebugEnabled())
LOG.debug("{} no suspended requests to resume", this, x);
return;
}

while (true)
// Allows concurrency with process(), but not with expire().
lock.readLock().lock();
try
{
if (resumeSuspended())
// See correspondent state machine logic in process() and expire().
int permits = state.incrementAndGet();
if (permits > 0)
{
if (LOG.isDebugEnabled())
LOG.debug("{} no suspended requests to resume", this, x);
return;
}

// Found no suspended requests yet, but there will be.
// This covers the small race window in handle(), where
// the state is updated and then the request suspended.
Thread.onSpinWait();
while (true)
{
if (resumeSuspended())
return;

// Found no suspended requests yet, but there will be.
// This covers the small race window in process(), where
// the state is updated and then the request suspended.
Thread.onSpinWait();
}
}
finally
{
lock.readLock().unlock();
}
}

Expand Down Expand Up @@ -357,17 +389,32 @@ public long getExpireNanoTime()

private void expire()
{
// The request timed out, therefore it never acquired a permit.
boolean removed = queues.get(priority).remove(this);
if (removed)
boolean removed;
// It should be rare that requests expire.
// Grab the write lock to atomically operate on the queue and
// the state, avoiding concurrency with process() and resume().
lock.writeLock().lock();
try
{
// See correspondent state machine logic in handle() and resume().
state.getAndIncrement();
if (LOG.isDebugEnabled())
LOG.debug("{} timeout {}", QoSHandler.this, request);
request.setAttribute(EXPIRED_ATTRIBUTE_NAME, true);
failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException());
// The request timed out, therefore it was not handled.
removed = queues.get(priority).remove(this);
// The remove() may fail to a concurrent resume().
if (removed)
{
// See correspondent state machine logic in process() and resume().
state.incrementAndGet();
if (LOG.isDebugEnabled())
LOG.debug("{} timeout {}", QoSHandler.this, request);
request.setAttribute(EXPIRED_ATTRIBUTE_NAME, true);
}
}
finally
{
lock.writeLock().unlock();
}

if (removed)
failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException());
}

@Override
Expand Down

0 comments on commit a9b2da5

Please sign in to comment.