Skip to content

Commit

Permalink
Stop the event queue task when event queue is empty (#1685)
Browse files Browse the repository at this point in the history
* stop the event queue task whenever no event to be processed (short running) and added tests

* changes to resolve review comments

* allow more time for the event queue task to stop
  • Loading branch information
dannybtsai authored Aug 20, 2021
1 parent c3f9f42 commit 08e98d1
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 109 deletions.
197 changes: 107 additions & 90 deletions src/Microsoft.IdentityModel.Tokens/EventBasedLRUCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,29 @@ internal class EventBasedLRUCache<TKey, TValue> : IDisposable
private LinkedList<LRUCacheItem<TKey, TValue>> _doubleLinkedList = new LinkedList<LRUCacheItem<TKey, TValue>>();
private BlockingCollection<Action> _eventQueue = new BlockingCollection<Action>();

// the event queue and maintenance tasks
#region event queue

// The time the _eventQueueTask should run after it is started (should continue even if the queue is empty to reduce the task startup overhead), default to 120 seconds.
// TODO: consider implementing a better algorithm that tracks and predicts the usage patterns and adjusts this value dynamically.
public int EventQueueTaskExecutionTimeInSeconds { get; private set; } = 120;

// the event queue task
private Task _eventQueueTask;

// The time when the _eventQueueTask should end. The intent is to reduce the overhead costs of starting/ending tasks too frequently
// but at the same time keep the _eventQueueTask a short running task.
// Since Task is based on thread pool the overhead should be reasonable.
private DateTime _eventQueueTaskStopTime;

// task states used to ensure thread safety (Interlocked.CompareExchange)
private const int EventQueueTaskStopped = 0; // task not started yet
private const int EventQueueTaskRunning = 1; // task is running
private const int EventQueueTaskDoNotStop = 2; // force the task to continue even it has past the _eventQueueTaskStopTime, see StartEventQueueTaskIfNotRunning() for more details

private int _eventQueueTaskState = EventQueueTaskStopped;

#endregion

private ConcurrentDictionary<TKey, LRUCacheItem<TKey, TValue>> _map;
// When the current cache size gets to this percentage of _capacity, _compactionPercentage% of the cache will be removed.
private readonly double _maxCapacityPercentage = .95;
Expand All @@ -72,13 +92,6 @@ internal class EventBasedLRUCache<TKey, TValue> : IDisposable

private readonly TaskCreationOptions _options;

// task states used to ensure thread safety (Interlocked.CompareExchange)
private const int EventQueueTaskStopped = 0; // task not started yet
private const int EventQueueTaskRunning = 1; // task is running
private const int EventQueueTaskStopRequested = 2; // a request has been received to stop the task

private int _eventQueueTaskState = EventQueueTaskStopped;

// timer that removes expired items periodically
private Timer _timer = null;

Expand All @@ -99,50 +112,12 @@ internal EventBasedLRUCache(
_map = new ConcurrentDictionary<TKey, LRUCacheItem<TKey, TValue>>(comparer ?? EqualityComparer<TKey>.Default);
_cleanUpIntervalInMilliSeconds = 1000 * cleanUpIntervalInSeconds;
_removeExpiredValues = removeExpiredValues;
_eventQueueTaskStopTime = DateTime.UtcNow;

if (_removeExpiredValues)
_timer = new Timer(RemoveExpiredValuesPeriodically, null, _cleanUpIntervalInMilliSeconds, _cleanUpIntervalInMilliSeconds); // initial delay then ticks every periodInMilliSeconds
}

/// <summary>
/// This is the delegate for starting the event queue task (and the timer to remove the expired items if _removeExpiredValues is true).
/// The task keeps running until it is disposed or cancelled (_eventQueueTaskState is set to EventQueueTaskStopRequested).
/// The task and timer are synchronized; both are running or stopped.
/// </summary>
private void EventQueueTaskAction()
{
Interlocked.Increment(ref _taskCount);

if (_removeExpiredValues)
{
ResumeTimer();
}

// Keep running until it is to be disposed, or when asked to stop (_eventQueueTaskState = EventQueueTaskStopRequested that happens in the OnLinkedListItemRemoved method).
// If _eventQueueTaskState == EventQueueTaskStopRequested, the Interlocked.CompareExchange() will set the _eventQueueTaskState to EventQueueTaskStopped and the
// while loop will exit (as the loop only continues if the original value of _eventQueueTaskState != EventQueueTaskStopRequested).
// Interlocked.CompareExchange() is called to check/set the _eventQueueTaskState value to avoid potential race condition that one thread
// is trying to start the task while another trying to stop it.
while (!_disposed && Interlocked.CompareExchange(ref _eventQueueTaskState, EventQueueTaskStopped, EventQueueTaskStopRequested) != EventQueueTaskStopRequested)
{
try
{
if (_eventQueue.TryTake(out var action, _tryTakeTimeout))
action.Invoke();
}
catch (Exception ex)
{
LogHelper.LogWarning(LogHelper.FormatInvariant(LogMessages.IDX10900, ex));
}
}

Interlocked.Decrement(ref _taskCount);
if (_removeExpiredValues && _taskCount == 0) // pause the timer only if the _taskCount is 0 to avoid the scenario that it is being resumed (above)
{
PauseTimer();
}
}

public bool Contains(TKey key)
{
if (key == null)
Expand All @@ -163,12 +138,6 @@ internal void WaitForProcessing()
}
}

/// <summary>
/// FOR TESTING PURPOSES ONLY.
/// This is for tests to verify all tasks exit at the end of tests if the queue is empty.
/// </summary>
internal int TaskCount => _taskCount;

internal int RemoveExpiredValues()
{
int numItemsRemoved = 0;
Expand Down Expand Up @@ -275,7 +244,7 @@ public bool SetValue(TKey key, TValue value, DateTime expirationTime)
_map[key] = newCacheItem;

// start the event queue task if it is not running
StartEventQueueTasksIfNotRunning();
StartEventQueueTaskIfNotRunning();
}

return true;
Expand Down Expand Up @@ -336,69 +305,107 @@ public bool TryRemove(TKey key, out TValue value)
private void RemoveItemFromLinkedList(LRUCacheItem<TKey, TValue> newCacheItem)
{
_doubleLinkedList.Remove(newCacheItem);
OnLinkedListItemRemoved();
}


/// <summary>
/// The method handling the event when an item is removed from the LinkedList.
/// The _eventQueueTask needs to be cancelled if the _doubleLinkedList is empty.
/// This is the delegate for the event queue task (and the timer to remove the expired items if _removeExpiredValues is true).
/// The task keeps running until it is disposed or expired (DateTime.UtcNow > _eventQueueTaskEndTime).
/// Note the task and timer are synchronized; both are running or stopped.
/// </summary>
private void OnLinkedListItemRemoved()
private void EventQueueTaskAction()
{
// To avoid race condition (another thread adds an item to the queue, in between this thread checking if the event queue is empty and changing the state)
// Setting the state to EventQueueTaskStopRequested guarantees that other threads adding items to the queue while this thread is checking the event queue count
// will see the changed state and set it back to EventQueueTaskRunning (see the method StartEventQueueTasksIfNotRunning)
_eventQueueTaskState = EventQueueTaskStopRequested; // set to stopping to prevent task being started
Interlocked.Increment(ref _taskCount);

if (_removeExpiredValues)
{
ResumeTimer();
}

// Keep running until it is disposed, or expired (DateTime.UtcNow > _eventQueueTaskEndTime).
while (!_disposed)
{
// always set the state to EventQueueTaskRunning in case it was set to EventQueueTaskDoNotStop
_eventQueueTaskState = EventQueueTaskRunning;

try
{
if (_eventQueue.TryTake(out var action, _tryTakeTimeout))
{
action?.Invoke();
}
else if (DateTime.UtcNow > _eventQueueTaskStopTime) // no more event to be processed, exit if expired
{
// Setting _eventQueueTaskState = EventQueueTaskStopped if the _eventQueueTaskEndTime has past and _eventQueueTaskState == EventQueueTaskRunning.
// This means no other thread came in and it is safe to end this task.
// If another thread adds new events while this task is still running, it will set the _eventQueueTaskState = EventQueueTaskDoNotStop instead of starting a new task.
// The Interlocked.CompareExchange() call below will not succeed and the loop continues (until the event queue is empty and the _eventQueueTaskEndTime expires again).
// This should prevent a rare (but theoretically possible) scenario caused by context switching.
if (Interlocked.CompareExchange(ref _eventQueueTaskState, EventQueueTaskStopped, EventQueueTaskRunning) == EventQueueTaskRunning)
break;
}
}
catch (Exception ex)
{
LogHelper.LogWarning(LogHelper.FormatInvariant(LogMessages.IDX10900, ex));
}
}

// stop the tasks only when both the _doubleLinkedList and _eventQueue are empty, otherwise change back to running
if (_doubleLinkedList.Count != 0 || _eventQueue.Count != 0)
Interlocked.Decrement(ref _taskCount);
if (_removeExpiredValues && _taskCount == 0) // pause the timer only if the _taskCount is 0 to avoid the scenario that it is being resumed (above)
{
_eventQueueTaskState = EventQueueTaskRunning; // set to the state back to running
PauseTimer();
}
}

/// <summary>
/// The method handling the event when an item is added to the LinkedList.
/// The _eventQueueTask needs to be started if the _doubleLinkedList is empty.
///
/// This method is called after an item is added to the queue, so it needs to start the event queue task if it is not running (_eventQueueTaskState == EventQueueTaskRunning),
/// and prevent the task from being stopped if it has been set to stop but not exited yet (the task while loop is still running).
/// This method is called after an item is added to the event queue, and the event queue task needs to be started if not running (_eventQueueTaskState != EventQueueTaskRunning).
/// Settitng _eventQueueTaskState to EventQueueTaskRunning should prevent multiple task being started.
/// </summary>
private void StartEventQueueTasksIfNotRunning()
private void StartEventQueueTaskIfNotRunning()
{
// Setting _eventQueueTaskState to EventQueueTaskRunning will keep the event queue task in EventQueueTaskAction running.
if (Interlocked.CompareExchange(ref _eventQueueTaskState, EventQueueTaskRunning, EventQueueTaskStopRequested) == EventQueueTaskStopRequested)
_eventQueueTaskStopTime = SetTaskEndTime(); // set the time when the _eventQueueTask should end

// Setting _eventQueueTaskState to EventQueueTaskDoNotStop here will force the event queue task in EventQueueTaskAction to continue even it has past the _eventQueueTaskEndTime.
// It is mainly to prevent a rare (but theoretically possible) scenario caused by context switching
// For example:
// 1. the task execution in EventQueueTaskAction() checks _eventQueueTaskStopTime and it has already passed (ready to exit)
// 2. the execution is switched to this thread (before it calls the Interlocked.CompareExchange() to set the _eventQueueTaskState to EventQueueTaskStopped)
// 3. the _eventQueueTaskStopTime is extended but it can't stop the task from stopping as the task has already passed the time check
// 4. now since the _eventQueueTaskState == EventQueueTaskRunning, it can be set to EventQueueTaskDoNotStop by the Interlocked.CompareExchange() below
// 5. if _eventQueueTaskState is successfully set to EventQueueTaskDoNotStop, the Interlocked.CompareExchange() in the EventQueueTaskAction() will fail
// and the task will continue the while loop and the new _eventQueueTaskStopTime will keep the task running
// 6. if _eventQueueTaskState is NOT set to EventQueueTaskDoNotStop because of context switch back to the EventQueueTaskAction() and the _eventQueueTaskState is
// set to EventQueueTaskStopped (task exits), then the second Interlocked.CompareExchange() below should set the _eventQueueTaskState to EventQueueTaskRunning
// and start a task again (though this scenario is unlikely to happen)
//
// Though this kind of context switching is probably unlikely to happen, this hopefully will take care of it.

if (Interlocked.CompareExchange(ref _eventQueueTaskState, EventQueueTaskDoNotStop, EventQueueTaskRunning) == EventQueueTaskRunning)
{
return;
}

// If we get here the original value of _eventQueueTaskState is either EventQueueTaskRunning or EventQueueTaskStopped and it is safe to
// proceed to call StartEventQueueTasks() and let it start the task if it is not running (_eventQueueTaskState == EventQueueTaskRunning).
// if the task is stopped, set _eventQueueTaskState = EventQueueTaskRunning and start a new task
if (Interlocked.CompareExchange(ref _eventQueueTaskState, EventQueueTaskRunning, EventQueueTaskStopped) == EventQueueTaskStopped)
{
var eventQueueTask = new Task(EventQueueTaskAction, _options);
eventQueueTask.Start();
eventQueueTask.GetAwaiter().OnCompleted(() => DisposeTask(eventQueueTask)); // dispose the task when it is complete

_eventQueueTask = eventQueueTask;
}
}

/// <summary>
/// Dispose of the specified task (if it is in a disposable state).
/// The Dispose() method should close the event object (WaitHandle) in the task but it may not affect the active task count.
/// This is the method that determines the end time for the event queue task.
/// The goal is to be able to track the incoming events and predict how long the task should run in order to
/// avoid a long running task and reduce the overhead costs of restarting tasks.
/// For example, maybe we can track the last three events' time and set the _eventQueueRunDurationInSeconds = 2 * average_time_between_events.
/// Note: tasks are based on thread pool so the overhead should not be huge but we should still try to minimize it.
/// </summary>
/// <param name="task">the task to be disposed</param>
private static void DisposeTask(Task task)
/// <returns>the time when the event queue task should end</returns>
private DateTime SetTaskEndTime()
{
if (task != null &&
(task.Status == TaskStatus.RanToCompletion ||
task.Status == TaskStatus.Canceled ||
task.Status == TaskStatus.Faulted))
{
task.Dispose();
}
return DateTime.UtcNow.AddSeconds(EventQueueTaskExecutionTimeInSeconds);
}

/// <summary>
Expand Down Expand Up @@ -454,6 +461,18 @@ internal ItemRemoved OnItemRemoved
/// </summary>
internal long EventQueueCount => _eventQueue.Count;

/// <summary>
/// FOR TESTING PURPOSES ONLY.
/// This is for tests to verify all tasks exit at the end of tests if the queue is empty.
/// </summary>
internal int TaskCount => _taskCount;

/// <summary>
/// FOR TESTING PURPOSES ONLY.
/// This is for tests to determine how long to wait for the event queue task to complete.
/// </summary>
internal int TaskExecutionTimeInSeconds => EventQueueTaskExecutionTimeInSeconds;

#endregion

/// <summary>
Expand All @@ -478,8 +497,6 @@ protected virtual void Dispose(bool disposing)
_disposed = true;
if (disposing)
{
DisposeTask(_eventQueueTask);

if (_timer != null)
{
_timer.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,11 @@ internal long EventQueueCountVerifying()
/// FOR TESTING PURPOSES ONLY.
/// </summary>
internal long TaskCount => _signingSignatureProviders.TaskCount + _verifyingSignatureProviders.TaskCount;
#endregion

/// <summary>
/// FOR TESTING PURPOSES ONLY.
/// </summary>
internal long TaskExecutionTimeInSeconds => _signingSignatureProviders.TaskExecutionTimeInSeconds;
#endregion
}
}
Loading

0 comments on commit 08e98d1

Please sign in to comment.