From a03edec438b1e8df9cd9d99a12c81bb12c1756c3 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Mon, 24 Jul 2023 17:51:52 -0700 Subject: [PATCH 1/2] Code changes to fix race conditions. --- .../src/direct/LoadBalancingPartition.cs | 57 +++++++++++++------ 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/direct/LoadBalancingPartition.cs b/Microsoft.Azure.Cosmos/src/direct/LoadBalancingPartition.cs index 0bd4cf67ca..2eb53887cc 100644 --- a/Microsoft.Azure.Cosmos/src/direct/LoadBalancingPartition.cs +++ b/Microsoft.Azure.Cosmos/src/direct/LoadBalancingPartition.cs @@ -18,6 +18,7 @@ internal sealed class LoadBalancingPartition : IDisposable private readonly int maxCapacity; // maxChannels * maxRequestsPerChannel private int requestsPending = 0; // Atomic. + private static int openChannelsPending = 0; // Atomic. // Clock hand. private readonly SequenceGenerator sequenceGenerator = new SequenceGenerator(); @@ -174,7 +175,7 @@ public async Task RequestAsync( int targetChannels = targetCapacity / this.channelProperties.MaxRequestsPerChannel; int channelsCreated = 0; - this.capacityLock.EnterWriteLock(); + this.capacityLock.EnterUpgradeableReadLock(); try { if (this.openChannels.Count < targetChannels) @@ -194,7 +195,10 @@ await this.OpenChannelAndIncrementCapacity( } finally { - this.capacityLock.ExitWriteLock(); + if (this.capacityLock.IsUpgradeableReadLockHeld) + { + this.capacityLock.ExitUpgradeableReadLock(); + } } if (channelsCreated > 0) { @@ -219,15 +223,21 @@ await this.OpenChannelAndIncrementCapacity( /// An unique identifier indicating the current activity id. internal Task OpenChannelAsync(Guid activityId) { - this.capacityLock.EnterWriteLock(); + this.capacityLock.EnterUpgradeableReadLock(); try { if (this.capacity < this.maxCapacity) { - return this.OpenChannelAndIncrementCapacity( - activityId: activityId, - waitForBackgroundInitializationComplete: true); - + if (Interlocked.Increment(ref LoadBalancingPartition.openChannelsPending) == 1) + { + return this.OpenChannelAndIncrementCapacity( + activityId: activityId, + waitForBackgroundInitializationComplete: true); + } + else + { + return Task.CompletedTask; + } } else { @@ -243,7 +253,10 @@ internal Task OpenChannelAsync(Guid activityId) } finally { - this.capacityLock.ExitWriteLock(); + if (this.capacityLock.IsUpgradeableReadLockHeld) + { + this.capacityLock.ExitUpgradeableReadLock(); + } } } @@ -266,7 +279,7 @@ public void Dispose() { this.capacityLock.Dispose(); } - catch(SynchronizationLockException) + catch (SynchronizationLockException) { // SynchronizationLockException is thrown if there are inflight requests during the disposal of capacityLock // suspend this exception to avoid crashing disposing other partitions/channels in hierarchical calls @@ -286,7 +299,7 @@ private async Task OpenChannelAndIncrementCapacity( Guid activityId, bool waitForBackgroundInitializationComplete) { - Debug.Assert(this.capacityLock.IsWriteLockHeld); + Debug.Assert(this.capacityLock.IsUpgradeableReadLockHeld); IChannel newChannel = this.channelFactory( activityId, @@ -307,11 +320,23 @@ private async Task OpenChannelAndIncrementCapacity( await newChannel.OpenChannelAsync(activityId); } - this.openChannels.Add( - new LbChannelState( - newChannel, - this.channelProperties.MaxRequestsPerChannel)); - this.capacity += this.channelProperties.MaxRequestsPerChannel; + try + { + this.capacityLock.EnterWriteLock(); + this.openChannels.Add( + new LbChannelState( + newChannel, + this.channelProperties.MaxRequestsPerChannel)); + this.capacity += this.channelProperties.MaxRequestsPerChannel; + } + finally + { + if (this.capacityLock.IsWriteLockHeld) + { + this.capacityLock.ExitWriteLock(); + Interlocked.Exchange(ref LoadBalancingPartition.openChannelsPending, 0); + } + } } /// @@ -322,7 +347,7 @@ private async Task OpenChannelAndIncrementCapacity( /// An instance of . /// A boolean flag indicating if the request is intendent for local region. /// An instance of . - /// + /// An instance of . private static IChannel CreateAndInitializeChannel( Guid activityId, Uri serverUri, From 80e1f3b6ea794ef10dd97b2f632e00f580ad967f Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Mon, 24 Jul 2023 22:34:54 -0700 Subject: [PATCH 2/2] Code changes to address review comments. --- .../src/direct/LoadBalancingPartition.cs | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/direct/LoadBalancingPartition.cs b/Microsoft.Azure.Cosmos/src/direct/LoadBalancingPartition.cs index 2eb53887cc..97fb998b42 100644 --- a/Microsoft.Azure.Cosmos/src/direct/LoadBalancingPartition.cs +++ b/Microsoft.Azure.Cosmos/src/direct/LoadBalancingPartition.cs @@ -195,10 +195,7 @@ await this.OpenChannelAndIncrementCapacity( } finally { - if (this.capacityLock.IsUpgradeableReadLockHeld) - { - this.capacityLock.ExitUpgradeableReadLock(); - } + this.capacityLock.ExitUpgradeableReadLock(); } if (channelsCreated > 0) { @@ -253,10 +250,7 @@ internal Task OpenChannelAsync(Guid activityId) } finally { - if (this.capacityLock.IsUpgradeableReadLockHeld) - { - this.capacityLock.ExitUpgradeableReadLock(); - } + this.capacityLock.ExitUpgradeableReadLock(); } } @@ -331,11 +325,8 @@ private async Task OpenChannelAndIncrementCapacity( } finally { - if (this.capacityLock.IsWriteLockHeld) - { - this.capacityLock.ExitWriteLock(); - Interlocked.Exchange(ref LoadBalancingPartition.openChannelsPending, 0); - } + Interlocked.Exchange(ref LoadBalancingPartition.openChannelsPending, 0); + this.capacityLock.ExitWriteLock(); } }