Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Upgrade Resiliency: Refactors LoadBalancingPartition to fix race conditions caused by Dispose() ing too early. #4005

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions Microsoft.Azure.Cosmos/src/direct/LoadBalancingPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ internal sealed class LoadBalancingPartition : IDisposable
private readonly int maxCapacity; // maxChannels * maxRequestsPerChannel

private int requestsPending = 0; // Atomic.
private static int openChannelsPending = 0; // Atomic.
// Clock hand.
private readonly SequenceGenerator sequenceGenerator =
new SequenceGenerator();
Expand Down Expand Up @@ -174,7 +175,7 @@ public async Task<StoreResponse> RequestAsync(
int targetChannels = targetCapacity / this.channelProperties.MaxRequestsPerChannel;
int channelsCreated = 0;

this.capacityLock.EnterWriteLock();
this.capacityLock.EnterUpgradeableReadLock();
try
{
if (this.openChannels.Count < targetChannels)
Expand All @@ -194,7 +195,7 @@ await this.OpenChannelAndIncrementCapacity(
}
finally
{
this.capacityLock.ExitWriteLock();
this.capacityLock.ExitUpgradeableReadLock();
}
if (channelsCreated > 0)
{
Expand All @@ -219,15 +220,21 @@ await this.OpenChannelAndIncrementCapacity(
/// <param name="activityId">An unique identifier indicating the current activity id.</param>
internal Task OpenChannelAsync(Guid activityId)
{
this.capacityLock.EnterWriteLock();
this.capacityLock.EnterUpgradeableReadLock();
try
{
if (this.capacity < this.maxCapacity)
{
return this.OpenChannelAndIncrementCapacity(
activityId: activityId,
waitForBackgroundInitializationComplete: true);

if (Interlocked.Increment(ref LoadBalancingPartition.openChannelsPending) == 1)
{
return this.OpenChannelAndIncrementCapacity(
activityId: activityId,
waitForBackgroundInitializationComplete: true);
}
else
{
return Task.CompletedTask;
}
}
else
{
Expand All @@ -243,7 +250,7 @@ internal Task OpenChannelAsync(Guid activityId)
}
finally
{
this.capacityLock.ExitWriteLock();
this.capacityLock.ExitUpgradeableReadLock();
}
}

Expand All @@ -266,7 +273,7 @@ public void Dispose()
{
this.capacityLock.Dispose();
}
catch(SynchronizationLockException)
catch (SynchronizationLockException)
{
// SynchronizationLockException is thrown if there are inflight requests during the disposal of capacityLock
// suspend this exception to avoid crashing disposing other partitions/channels in hierarchical calls
Expand All @@ -286,7 +293,7 @@ private async Task OpenChannelAndIncrementCapacity(
Guid activityId,
bool waitForBackgroundInitializationComplete)
{
Debug.Assert(this.capacityLock.IsWriteLockHeld);
Debug.Assert(this.capacityLock.IsUpgradeableReadLockHeld);

IChannel newChannel = this.channelFactory(
activityId,
Expand All @@ -307,11 +314,20 @@ private async Task OpenChannelAndIncrementCapacity(
await newChannel.OpenChannelAsync(activityId);
}

this.openChannels.Add(
new LbChannelState(
newChannel,
this.channelProperties.MaxRequestsPerChannel));
this.capacity += this.channelProperties.MaxRequestsPerChannel;
try
{
this.capacityLock.EnterWriteLock();
this.openChannels.Add(
new LbChannelState(
newChannel,
this.channelProperties.MaxRequestsPerChannel));
this.capacity += this.channelProperties.MaxRequestsPerChannel;
}
finally
{
Interlocked.Exchange(ref LoadBalancingPartition.openChannelsPending, 0);
this.capacityLock.ExitWriteLock();
}
}

/// <summary>
Expand All @@ -322,7 +338,7 @@ private async Task OpenChannelAndIncrementCapacity(
/// <param name="channelProperties">An instance of <see cref="ChannelProperties"/>.</param>
/// <param name="localRegionRequest">A boolean flag indicating if the request is intendent for local region.</param>
/// <param name="concurrentOpeningChannelSlim">An instance of <see cref="SemaphoreSlim"/>.</param>
/// <returns></returns>
/// <returns>An instance of <see cref="Channel"/>.</returns>
private static IChannel CreateAndInitializeChannel(
Guid activityId,
Uri serverUri,
Expand Down