From 409edba7b52fdcb6e85bdb4521981a0ebf6239d4 Mon Sep 17 00:00:00 2001 From: Pankaj Saini Date: Fri, 19 Apr 2024 12:42:59 -0700 Subject: [PATCH 1/6] Get instance id for desired control-queue(s) --- .../ControlQueueHelperTests.cs | 206 ++++++++++++++++++ .../AzureStorageOrchestrationService.cs | 88 +++++++- .../IControlQueueHelper.cs | 28 +++ 3 files changed, 320 insertions(+), 2 deletions(-) create mode 100644 Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs create mode 100644 src/DurableTask.AzureStorage/ControlQueueHelpers/IControlQueueHelper.cs diff --git a/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs b/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs new file mode 100644 index 000000000..283453dd8 --- /dev/null +++ b/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs @@ -0,0 +1,206 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.AzureStorage.Tests +{ + using System; + using System.Collections.Generic; + using System.Data; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.VisualStudio.TestTools.UnitTesting; + + [TestClass] + public class ControlQueueHelperTests + { + private IControlQueueHelper controlQueueHelper; + private AzureStorageOrchestrationService azureStorageOrchestrationService; + private AzureStorageOrchestrationServiceSettings settings; + private int partitionCount = 4; + private Dictionary controlQueueNumberToNameMap; + private CancellationTokenSource cancellationTokenSource; + private const string TaskHub = "taskHubName"; + + [TestInitialize] + public void Initialize() + { + cancellationTokenSource = new CancellationTokenSource(); + + settings = new AzureStorageOrchestrationServiceSettings() + { + StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(), + TaskHubName = TaskHub, + PartitionCount = partitionCount + }; + + azureStorageOrchestrationService = new AzureStorageOrchestrationService(settings); + controlQueueHelper = azureStorageOrchestrationService; + + controlQueueNumberToNameMap = new Dictionary(); + + for (int i = 0; i < partitionCount; i++) + { + var controlQueueName = AzureStorageOrchestrationService.GetControlQueueName(settings.TaskHubName, i); + controlQueueNumberToNameMap[controlQueueName] = i; + } + } + + [TestMethod] + public void GetControlQueueInstanceId_NullArgument() + { + Action a1 = () => + { + controlQueueHelper.GetControlQueueInstanceId((HashSet)null, $"prefix{Guid.NewGuid()}_"); + }; + + Assert.ThrowsException(a1); + + Action a2 = () => + { + controlQueueHelper.GetControlQueueInstanceId((HashSet)null, $"prefix{Guid.NewGuid()}_"); + }; + + Assert.ThrowsException(a2); + } + + [TestMethod] + [DataRow(new string[] { "taskHubName-control-00", "taskHubName-control-01", "taskHubName-control-02", "taskHubName-control-03", "taskHubName-control-04", "taskHubName-control-05" })] + [DataRow(new string[] { "taskHubName-control-02", "taskHubName-control--1" })] + [DataRow(new string[] { "taskHubName-control-01", "taskHubName-control-07" })] + [DataRow(new string[] { "taskHubName-control-09" })] + [DataRow(new string[] { "taskHubName-09" })] + [DataRow(new string[] { "TaskHubName-control-9" })] + [DataRow(new string[] { })] + public void GetControlQueueInstanceId_ControlQueueNames_InvalidArgument(string[] controlQueueNumbers) + { + var controlQueueNumbersHashSet = new HashSet(); + + foreach (var cQN in controlQueueNumbers) + { + controlQueueNumbersHashSet.Add(cQN); + } + + Action a = () => + { + controlQueueHelper.GetControlQueueInstanceId(controlQueueNumbersHashSet, $"prefix{Guid.NewGuid()}_"); + }; + + Assert.ThrowsException(a); + } + + [TestMethod] + [DataRow(new int[] { 0, 1, 2, 3, 4, 5 })] + [DataRow(new int[] { 2, -1 })] + [DataRow(new int[] { 1, 7 })] + [DataRow(new int[] { 9 })] + [DataRow(new int[] { })] + public void GetControlQueueInstanceId_ControlQueueNumbers_InvalidArgument(int[] controlQueueNumbers) + { + var controlQueueNumbersHashSet = new HashSet(); + + foreach (var cQN in controlQueueNumbers) + { + controlQueueNumbersHashSet.Add(cQN); + } + + Action a = () => + { + controlQueueHelper.GetControlQueueInstanceId(controlQueueNumbersHashSet, $"prefix{Guid.NewGuid()}_"); + }; + + Assert.ThrowsException(a); + } + + [TestMethod] + [DataRow(new string[] { "taskhubname-control-00", "taskhubname-control-01", "taskhubname-control-02", "taskhubname-control-03" })] + [DataRow(new string[] { "taskhubname-control-02", "taskhubname-control-03" })] + [DataRow(new string[] { "taskhubname-control-01", "taskhubname-control-03" })] + [DataRow(new string[] { "taskhubname-control-00", "taskhubname-control-01" })] + [DataRow(new string[] { "taskhubname-control-00", "taskhubname-control-02" })] + [DataRow(new string[] { "taskhubname-control-00", "taskhubname-control-03" })] + [DataRow(new string[] { "taskhubname-control-00" })] + [DataRow(new string[] { "taskhubname-control-01" })] + [DataRow(new string[] { "taskHubname-control-03" })] + public async Task GetControlQueueInstanceId_ControlQueueNames(string[] controlQueueNames) + { + Dictionary> controlQueueNumberToInstanceIds = new Dictionary>(); + + var controlQueueNumbersHashSet = new HashSet(); + + foreach (var cQN in controlQueueNames) + { + controlQueueNumbersHashSet.Add(cQN); + controlQueueNumberToInstanceIds[cQN.ToLowerInvariant()] = new List(); + } + + for (int i = 0; i < 100; i++) + { + var instanceId = controlQueueHelper.GetControlQueueInstanceId(controlQueueNumbersHashSet, $"prefix{Guid.NewGuid()}_"); + + var controlQueue = await azureStorageOrchestrationService.GetControlQueueAsync(instanceId); + var controlQueueNumber = controlQueueNumberToNameMap[controlQueue.Name]; + + controlQueueNumberToInstanceIds[controlQueue.Name.ToLowerInvariant()].Add(instanceId); + + Assert.IsTrue(controlQueueNames.Any(x => x.Equals(controlQueue.Name, StringComparison.OrdinalIgnoreCase))); + } + + foreach (var cQN in controlQueueNames) + { + Assert.IsTrue(controlQueueNumberToInstanceIds[cQN.ToLowerInvariant()].Count > 0); + } + } + + [TestMethod] + [DataRow(new int[] { 0, 1, 2, 3 })] + [DataRow(new int[] { 2, 3 })] + [DataRow(new int[] { 1, 3 })] + [DataRow(new int[] { 0, 1 })] + [DataRow(new int[] { 0, 2 })] + [DataRow(new int[] { 0, 3 })] + [DataRow(new int[] { 0 })] + [DataRow(new int[] { 1 })] + [DataRow(new int[] { 3 })] + public async Task GetControlQueueInstanceId_ControlQueueNumbers(int[] controlQueueNumbers) + { + Dictionary> controlQueueNumberToInstanceIds = new Dictionary>(); + + var controlQueueNumbersHashSet = new HashSet(); + + foreach (var cQN in controlQueueNumbers) + { + controlQueueNumbersHashSet.Add(cQN); + controlQueueNumberToInstanceIds[cQN] = new List(); + } + + + for (int i = 0; i < 100; i++) + { + var instanceId = controlQueueHelper.GetControlQueueInstanceId(controlQueueNumbersHashSet, $"prefix{Guid.NewGuid()}_"); + + var controlQueue = await azureStorageOrchestrationService.GetControlQueueAsync(instanceId); + var controlQueueNumber = controlQueueNumberToNameMap[controlQueue.Name]; + + controlQueueNumberToInstanceIds[controlQueueNumber].Add(instanceId); + + Assert.IsTrue(controlQueueNumbers.Any(x => x == controlQueueNumber)); + } + + foreach (var cQN in controlQueueNumbers) + { + Assert.IsTrue(controlQueueNumberToInstanceIds[cQN].Count > 0); + } + } + } +} diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 9f5d15047..880af3bd3 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -43,7 +43,8 @@ public sealed class AzureStorageOrchestrationService : IDisposable, IOrchestrationServiceQueryClient, IOrchestrationServicePurgeClient, - IEntityOrchestrationService + IEntityOrchestrationService, + IControlQueueHelper { static readonly HistoryEvent[] EmptyHistoryEventList = new HistoryEvent[0]; @@ -2048,7 +2049,7 @@ public Task DownloadBlobAsync(string blobUri) // TODO: Change this to a sticky assignment so that partition count changes can // be supported: https://github.com/Azure/azure-functions-durable-extension/issues/1 - async Task GetControlQueueAsync(string instanceId) + internal async Task GetControlQueueAsync(string instanceId) { uint partitionIndex = Fnv1aHashHelper.ComputeHash(instanceId) % (uint)this.settings.PartitionCount; string queueName = GetControlQueueName(this.settings.TaskHubName, (int)partitionIndex); @@ -2075,6 +2076,89 @@ public Task DownloadBlobAsync(string blobUri) return cachedQueue; } + #region IControlQueueHelper + + private object lockObject = new object(); + + private Dictionary controlQueueNumberToNames; + + private Dictionary ControlQueueNamesToPartitionIndex + { + get + { + if(controlQueueNumberToNames == null) + { + lock (lockObject) + { + if (controlQueueNumberToNames == null) + { + controlQueueNumberToNames = new Dictionary(); + for (int controlQueueNumber = 0; controlQueueNumber < this.settings.PartitionCount; controlQueueNumber++) + { + var controlQueueName = GetControlQueueName(this.settings.TaskHubName, controlQueueNumber); + controlQueueNumberToNames[controlQueueName] = controlQueueNumber; + } + } + } + } + + return controlQueueNumberToNames; + } + } + + /// + public string GetControlQueueInstanceId(HashSet controlQueueNames, string instanceIdPrefix = "") + { + _ = controlQueueNames == null ? throw new ArgumentNullException(nameof(controlQueueNames)) + : controlQueueNames.Count == 0 ? + throw new ArgumentException($"{nameof(controlQueueNames)} must contain at least one element.") + : controlQueueNames.Any(x => !ControlQueueNamesToPartitionIndex.Keys.Contains(x.ToLowerInvariant())) ? + throw new ArgumentException($"{nameof(controlQueueNames)} must be valid control queue names") + : controlQueueNames; + + HashSet controlQueueNumbers = new HashSet(); + foreach (var controlQueueName in controlQueueNames) + { + controlQueueNumbers.Add(ControlQueueNamesToPartitionIndex[controlQueueName.ToLowerInvariant()]); + } + + return GetControlQueueInstanceId(controlQueueNumbers, instanceIdPrefix); + } + + /// + public string GetControlQueueInstanceId(HashSet controlQueueNumbers, string instanceIdPrefix = "") + { + _ = controlQueueNumbers == null ? throw new ArgumentNullException(nameof(controlQueueNumbers)) + : controlQueueNumbers.Count == 0 ? + throw new ArgumentException($"{nameof(controlQueueNumbers)} must contain at least one element.") + : controlQueueNumbers.Any(x => x < 0 || x >= this.settings.PartitionCount) ? + throw new ArgumentException($"{nameof(controlQueueNumbers)} must contain values in range [0, {this.settings.PartitionCount}].") + : controlQueueNumbers; + + var instanceId = string.Empty; + int suffix = 0; + bool foundInstanceId = false; + + var partitionCount = this.settings.PartitionCount; + + // Updating suffix and checking control-queue being from provided list until found one. + while (!foundInstanceId) + { + suffix++; + instanceId = $"{instanceIdPrefix}{suffix}"; + var controlQueueNumber = (int)Fnv1aHashHelper.ComputeHash(instanceId) % partitionCount; + + if (controlQueueNumbers.Any(x => x == controlQueueNumber)) + { + foundInstanceId = true; + } + } + + return instanceId; + } + + #endregion IControlQueueHelper + /// /// Disposes of the current object. /// diff --git a/src/DurableTask.AzureStorage/ControlQueueHelpers/IControlQueueHelper.cs b/src/DurableTask.AzureStorage/ControlQueueHelpers/IControlQueueHelper.cs new file mode 100644 index 000000000..07e6c526d --- /dev/null +++ b/src/DurableTask.AzureStorage/ControlQueueHelpers/IControlQueueHelper.cs @@ -0,0 +1,28 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace DurableTask.AzureStorage +{ + /// + /// Provides helper functions to work with control-queues. + /// + public interface IControlQueueHelper + { + /// + /// Gets instanceId which is targeted for mentioned control-queue names. + /// + /// Collection of controlQueueNames. + /// InstanceId prefix. + /// InstanceId for control-queue. + string GetControlQueueInstanceId(HashSet controlQueueNames, string instanceIdPrefix = ""); + + /// + /// Gets instanceId which is targeted for mentioned control-queue numbers. + /// + /// Collection of controlQueueNumbers. + /// InstanceId prefix. + /// InstanceId for control-queue. + string GetControlQueueInstanceId(HashSet controlQueueNumbers, string instanceIdPrefix = ""); + } +} From 22806050e8c7309c9d56b301a5a7d8d99ae0f7ff Mon Sep 17 00:00:00 2001 From: Pankaj Saini Date: Fri, 26 Apr 2024 07:26:41 -0700 Subject: [PATCH 2/6] Bringing randomization to instance ids. --- .../ControlQueueHelperTests.cs | 41 +++++++++++++++++++ .../AzureStorageOrchestrationService.cs | 16 ++++++-- 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs b/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs index 283453dd8..678c5f3b2 100644 --- a/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs +++ b/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs @@ -202,5 +202,46 @@ public async Task GetControlQueueInstanceId_ControlQueueNumbers(int[] controlQue Assert.IsTrue(controlQueueNumberToInstanceIds[cQN].Count > 0); } } + + [TestMethod] + [DataRow(new int[] { 0, 1, 2, 3 })] + [DataRow(new int[] { 2, 3 })] + [DataRow(new int[] { 1, 3 })] + [DataRow(new int[] { 0, 1 })] + [DataRow(new int[] { 0, 2 })] + [DataRow(new int[] { 0, 3 })] + [DataRow(new int[] { 0 })] + [DataRow(new int[] { 1 })] + [DataRow(new int[] { 3 })] + public async Task GetControlQueueInstanceId_ControlQueueNumbers_ConstPrefix(int[] controlQueueNumbers) + { + Dictionary> controlQueueNumberToInstanceIds = new Dictionary>(); + + var controlQueueNumbersHashSet = new HashSet(); + + foreach (var cQN in controlQueueNumbers) + { + controlQueueNumbersHashSet.Add(cQN); + controlQueueNumberToInstanceIds[cQN] = new List(); + } + + + for (int i = 0; i < 100; i++) + { + var instanceId = controlQueueHelper.GetControlQueueInstanceId(controlQueueNumbersHashSet, $"prefix_"); + + var controlQueue = await azureStorageOrchestrationService.GetControlQueueAsync(instanceId); + var controlQueueNumber = controlQueueNumberToNameMap[controlQueue.Name]; + + controlQueueNumberToInstanceIds[controlQueueNumber].Add(instanceId); + + Assert.IsTrue(controlQueueNumbers.Any(x => x == controlQueueNumber)); + } + + foreach (var cQN in controlQueueNumbers) + { + Assert.IsTrue(controlQueueNumberToInstanceIds[cQN].Count > 0); + } + } } } diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 880af3bd3..c9af27182 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -2051,7 +2051,7 @@ public Task DownloadBlobAsync(string blobUri) // be supported: https://github.com/Azure/azure-functions-durable-extension/issues/1 internal async Task GetControlQueueAsync(string instanceId) { - uint partitionIndex = Fnv1aHashHelper.ComputeHash(instanceId) % (uint)this.settings.PartitionCount; + uint partitionIndex = GetPartitionIndex(instanceId); string queueName = GetControlQueueName(this.settings.TaskHubName, (int)partitionIndex); ControlQueue cachedQueue; @@ -2076,6 +2076,11 @@ public Task DownloadBlobAsync(string blobUri) return cachedQueue; } + private uint GetPartitionIndex(string instanceId) + { + return Fnv1aHashHelper.ComputeHash(instanceId) % (uint)this.settings.PartitionCount; + } + #region IControlQueueHelper private object lockObject = new object(); @@ -2141,12 +2146,17 @@ public string GetControlQueueInstanceId(HashSet controlQueueNumbers, string var partitionCount = this.settings.PartitionCount; + var randomString = Guid.NewGuid().ToString().Substring(24); + var instanceIdPrefixWithRandomness = $"{instanceIdPrefix}{randomString}"; + // Updating suffix and checking control-queue being from provided list until found one. while (!foundInstanceId) { suffix++; - instanceId = $"{instanceIdPrefix}{suffix}"; - var controlQueueNumber = (int)Fnv1aHashHelper.ComputeHash(instanceId) % partitionCount; + instanceId = $"{instanceIdPrefixWithRandomness}{suffix}"; + uint controlQueueNumber = GetPartitionIndex(instanceId); + + GetControlQueueName(this.settings.TaskHubName, (int)controlQueueNumber); if (controlQueueNumbers.Any(x => x == controlQueueNumber)) { From e98bc4d1e3beebbe902724b69ba3805e1d4bda7f Mon Sep 17 00:00:00 2001 From: Pankaj Saini Date: Sat, 15 Jun 2024 11:59:57 -0700 Subject: [PATCH 3/6] Updating logic for consistenc with netherite. --- .../ControlQueueHelperTests.cs | 247 ------------------ .../TestPartitionIndex.cs | 93 +++++++ .../AzureStorageOrchestrationService.cs | 98 +------ ...zureStorageOrchestrationServiceSettings.cs | 6 + .../IControlQueueHelper.cs | 28 -- 5 files changed, 112 insertions(+), 360 deletions(-) delete mode 100644 Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs create mode 100644 Test/DurableTask.AzureStorage.Tests/TestPartitionIndex.cs delete mode 100644 src/DurableTask.AzureStorage/ControlQueueHelpers/IControlQueueHelper.cs diff --git a/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs b/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs deleted file mode 100644 index 678c5f3b2..000000000 --- a/Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs +++ /dev/null @@ -1,247 +0,0 @@ -// ---------------------------------------------------------------------------------- -// Copyright Microsoft Corporation -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// http://www.apache.org/licenses/LICENSE-2.0 -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// ---------------------------------------------------------------------------------- - -namespace DurableTask.AzureStorage.Tests -{ - using System; - using System.Collections.Generic; - using System.Data; - using System.Linq; - using System.Threading; - using System.Threading.Tasks; - using Microsoft.VisualStudio.TestTools.UnitTesting; - - [TestClass] - public class ControlQueueHelperTests - { - private IControlQueueHelper controlQueueHelper; - private AzureStorageOrchestrationService azureStorageOrchestrationService; - private AzureStorageOrchestrationServiceSettings settings; - private int partitionCount = 4; - private Dictionary controlQueueNumberToNameMap; - private CancellationTokenSource cancellationTokenSource; - private const string TaskHub = "taskHubName"; - - [TestInitialize] - public void Initialize() - { - cancellationTokenSource = new CancellationTokenSource(); - - settings = new AzureStorageOrchestrationServiceSettings() - { - StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(), - TaskHubName = TaskHub, - PartitionCount = partitionCount - }; - - azureStorageOrchestrationService = new AzureStorageOrchestrationService(settings); - controlQueueHelper = azureStorageOrchestrationService; - - controlQueueNumberToNameMap = new Dictionary(); - - for (int i = 0; i < partitionCount; i++) - { - var controlQueueName = AzureStorageOrchestrationService.GetControlQueueName(settings.TaskHubName, i); - controlQueueNumberToNameMap[controlQueueName] = i; - } - } - - [TestMethod] - public void GetControlQueueInstanceId_NullArgument() - { - Action a1 = () => - { - controlQueueHelper.GetControlQueueInstanceId((HashSet)null, $"prefix{Guid.NewGuid()}_"); - }; - - Assert.ThrowsException(a1); - - Action a2 = () => - { - controlQueueHelper.GetControlQueueInstanceId((HashSet)null, $"prefix{Guid.NewGuid()}_"); - }; - - Assert.ThrowsException(a2); - } - - [TestMethod] - [DataRow(new string[] { "taskHubName-control-00", "taskHubName-control-01", "taskHubName-control-02", "taskHubName-control-03", "taskHubName-control-04", "taskHubName-control-05" })] - [DataRow(new string[] { "taskHubName-control-02", "taskHubName-control--1" })] - [DataRow(new string[] { "taskHubName-control-01", "taskHubName-control-07" })] - [DataRow(new string[] { "taskHubName-control-09" })] - [DataRow(new string[] { "taskHubName-09" })] - [DataRow(new string[] { "TaskHubName-control-9" })] - [DataRow(new string[] { })] - public void GetControlQueueInstanceId_ControlQueueNames_InvalidArgument(string[] controlQueueNumbers) - { - var controlQueueNumbersHashSet = new HashSet(); - - foreach (var cQN in controlQueueNumbers) - { - controlQueueNumbersHashSet.Add(cQN); - } - - Action a = () => - { - controlQueueHelper.GetControlQueueInstanceId(controlQueueNumbersHashSet, $"prefix{Guid.NewGuid()}_"); - }; - - Assert.ThrowsException(a); - } - - [TestMethod] - [DataRow(new int[] { 0, 1, 2, 3, 4, 5 })] - [DataRow(new int[] { 2, -1 })] - [DataRow(new int[] { 1, 7 })] - [DataRow(new int[] { 9 })] - [DataRow(new int[] { })] - public void GetControlQueueInstanceId_ControlQueueNumbers_InvalidArgument(int[] controlQueueNumbers) - { - var controlQueueNumbersHashSet = new HashSet(); - - foreach (var cQN in controlQueueNumbers) - { - controlQueueNumbersHashSet.Add(cQN); - } - - Action a = () => - { - controlQueueHelper.GetControlQueueInstanceId(controlQueueNumbersHashSet, $"prefix{Guid.NewGuid()}_"); - }; - - Assert.ThrowsException(a); - } - - [TestMethod] - [DataRow(new string[] { "taskhubname-control-00", "taskhubname-control-01", "taskhubname-control-02", "taskhubname-control-03" })] - [DataRow(new string[] { "taskhubname-control-02", "taskhubname-control-03" })] - [DataRow(new string[] { "taskhubname-control-01", "taskhubname-control-03" })] - [DataRow(new string[] { "taskhubname-control-00", "taskhubname-control-01" })] - [DataRow(new string[] { "taskhubname-control-00", "taskhubname-control-02" })] - [DataRow(new string[] { "taskhubname-control-00", "taskhubname-control-03" })] - [DataRow(new string[] { "taskhubname-control-00" })] - [DataRow(new string[] { "taskhubname-control-01" })] - [DataRow(new string[] { "taskHubname-control-03" })] - public async Task GetControlQueueInstanceId_ControlQueueNames(string[] controlQueueNames) - { - Dictionary> controlQueueNumberToInstanceIds = new Dictionary>(); - - var controlQueueNumbersHashSet = new HashSet(); - - foreach (var cQN in controlQueueNames) - { - controlQueueNumbersHashSet.Add(cQN); - controlQueueNumberToInstanceIds[cQN.ToLowerInvariant()] = new List(); - } - - for (int i = 0; i < 100; i++) - { - var instanceId = controlQueueHelper.GetControlQueueInstanceId(controlQueueNumbersHashSet, $"prefix{Guid.NewGuid()}_"); - - var controlQueue = await azureStorageOrchestrationService.GetControlQueueAsync(instanceId); - var controlQueueNumber = controlQueueNumberToNameMap[controlQueue.Name]; - - controlQueueNumberToInstanceIds[controlQueue.Name.ToLowerInvariant()].Add(instanceId); - - Assert.IsTrue(controlQueueNames.Any(x => x.Equals(controlQueue.Name, StringComparison.OrdinalIgnoreCase))); - } - - foreach (var cQN in controlQueueNames) - { - Assert.IsTrue(controlQueueNumberToInstanceIds[cQN.ToLowerInvariant()].Count > 0); - } - } - - [TestMethod] - [DataRow(new int[] { 0, 1, 2, 3 })] - [DataRow(new int[] { 2, 3 })] - [DataRow(new int[] { 1, 3 })] - [DataRow(new int[] { 0, 1 })] - [DataRow(new int[] { 0, 2 })] - [DataRow(new int[] { 0, 3 })] - [DataRow(new int[] { 0 })] - [DataRow(new int[] { 1 })] - [DataRow(new int[] { 3 })] - public async Task GetControlQueueInstanceId_ControlQueueNumbers(int[] controlQueueNumbers) - { - Dictionary> controlQueueNumberToInstanceIds = new Dictionary>(); - - var controlQueueNumbersHashSet = new HashSet(); - - foreach (var cQN in controlQueueNumbers) - { - controlQueueNumbersHashSet.Add(cQN); - controlQueueNumberToInstanceIds[cQN] = new List(); - } - - - for (int i = 0; i < 100; i++) - { - var instanceId = controlQueueHelper.GetControlQueueInstanceId(controlQueueNumbersHashSet, $"prefix{Guid.NewGuid()}_"); - - var controlQueue = await azureStorageOrchestrationService.GetControlQueueAsync(instanceId); - var controlQueueNumber = controlQueueNumberToNameMap[controlQueue.Name]; - - controlQueueNumberToInstanceIds[controlQueueNumber].Add(instanceId); - - Assert.IsTrue(controlQueueNumbers.Any(x => x == controlQueueNumber)); - } - - foreach (var cQN in controlQueueNumbers) - { - Assert.IsTrue(controlQueueNumberToInstanceIds[cQN].Count > 0); - } - } - - [TestMethod] - [DataRow(new int[] { 0, 1, 2, 3 })] - [DataRow(new int[] { 2, 3 })] - [DataRow(new int[] { 1, 3 })] - [DataRow(new int[] { 0, 1 })] - [DataRow(new int[] { 0, 2 })] - [DataRow(new int[] { 0, 3 })] - [DataRow(new int[] { 0 })] - [DataRow(new int[] { 1 })] - [DataRow(new int[] { 3 })] - public async Task GetControlQueueInstanceId_ControlQueueNumbers_ConstPrefix(int[] controlQueueNumbers) - { - Dictionary> controlQueueNumberToInstanceIds = new Dictionary>(); - - var controlQueueNumbersHashSet = new HashSet(); - - foreach (var cQN in controlQueueNumbers) - { - controlQueueNumbersHashSet.Add(cQN); - controlQueueNumberToInstanceIds[cQN] = new List(); - } - - - for (int i = 0; i < 100; i++) - { - var instanceId = controlQueueHelper.GetControlQueueInstanceId(controlQueueNumbersHashSet, $"prefix_"); - - var controlQueue = await azureStorageOrchestrationService.GetControlQueueAsync(instanceId); - var controlQueueNumber = controlQueueNumberToNameMap[controlQueue.Name]; - - controlQueueNumberToInstanceIds[controlQueueNumber].Add(instanceId); - - Assert.IsTrue(controlQueueNumbers.Any(x => x == controlQueueNumber)); - } - - foreach (var cQN in controlQueueNumbers) - { - Assert.IsTrue(controlQueueNumberToInstanceIds[cQN].Count > 0); - } - } - } -} diff --git a/Test/DurableTask.AzureStorage.Tests/TestPartitionIndex.cs b/Test/DurableTask.AzureStorage.Tests/TestPartitionIndex.cs new file mode 100644 index 000000000..aaf708715 --- /dev/null +++ b/Test/DurableTask.AzureStorage.Tests/TestPartitionIndex.cs @@ -0,0 +1,93 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.AzureStorage.Tests +{ + using System; + using System.Collections.Generic; + using System.Data; + using System.Threading; + using Microsoft.VisualStudio.TestTools.UnitTesting; + + [TestClass] + public class TestPartitionIndex + { + private AzureStorageOrchestrationService azureStorageOrchestrationService; + private AzureStorageOrchestrationServiceSettings settings; + private int partitionCount = 4; + private Dictionary controlQueueNumberToNameMap; + private CancellationTokenSource cancellationTokenSource; + private const string TaskHub = "taskHubName"; + + [TestInitialize] + public void Initialize() + { + cancellationTokenSource = new CancellationTokenSource(); + + settings = new AzureStorageOrchestrationServiceSettings() + { + StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(), + TaskHubName = TaskHub, + PartitionCount = partitionCount + }; + + azureStorageOrchestrationService = new AzureStorageOrchestrationService(settings); + + controlQueueNumberToNameMap = new Dictionary(); + + for (int i = 0; i < partitionCount; i++) + { + var controlQueueName = AzureStorageOrchestrationService.GetControlQueueName(settings.TaskHubName, i); + controlQueueNumberToNameMap[controlQueueName] = i; + } + } + + [TestMethod] + [DataRow(20, false)] + [DataRow(20, true)] + public void GetPartitionIndexTest(int maxInstanceIdCount, bool enableExplicitPartitionPlacement) + { + settings.EnableExplicitPartitionPlacement = enableExplicitPartitionPlacement; + + for (uint instanceIdSuffix = 0; instanceIdSuffix < settings.PartitionCount * 4; instanceIdSuffix++) + { + Dictionary indexNumberToCount = new Dictionary(); + + for (uint indexCount = 0; indexCount < settings.PartitionCount; indexCount++) + { + indexNumberToCount[indexCount] = 0; + } + + for (int instanceCount = 0; instanceCount < maxInstanceIdCount; instanceCount++) + { + var instanceIdPrefix = Guid.NewGuid().ToString(); + + var instanceId = $"{instanceIdPrefix}!{instanceIdSuffix}"; + + var partitionIndex = azureStorageOrchestrationService.GetPartitionIndex(instanceId); + + indexNumberToCount[partitionIndex]++; + } + + if (enableExplicitPartitionPlacement) + { + Assert.AreEqual(indexNumberToCount[(uint)(instanceIdSuffix % settings.PartitionCount)], maxInstanceIdCount); + } + else + { + Assert.AreNotEqual(indexNumberToCount[(uint)(instanceIdSuffix % settings.PartitionCount)], maxInstanceIdCount); + } + } + } + } +} diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index c9af27182..d8fee5cf2 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -43,8 +43,7 @@ public sealed class AzureStorageOrchestrationService : IDisposable, IOrchestrationServiceQueryClient, IOrchestrationServicePurgeClient, - IEntityOrchestrationService, - IControlQueueHelper + IEntityOrchestrationService { static readonly HistoryEvent[] EmptyHistoryEventList = new HistoryEvent[0]; @@ -2076,99 +2075,28 @@ public Task DownloadBlobAsync(string blobUri) return cachedQueue; } - private uint GetPartitionIndex(string instanceId) + internal uint GetPartitionIndex(string instanceId) { - return Fnv1aHashHelper.ComputeHash(instanceId) % (uint)this.settings.PartitionCount; - } - - #region IControlQueueHelper - - private object lockObject = new object(); + uint totalPartitions = (uint)this.settings.PartitionCount; - private Dictionary controlQueueNumberToNames; + int placementSeparatorPosition = instanceId.LastIndexOf('!'); - private Dictionary ControlQueueNamesToPartitionIndex - { - get + // if the instance id ends with !nnn, where nnn is an unsigned number, it indicates explicit partition placement + if ( + this.settings.EnableExplicitPartitionPlacement + && placementSeparatorPosition != -1 + && uint.TryParse(instanceId.Substring(placementSeparatorPosition + 1), out uint index)) { - if(controlQueueNumberToNames == null) - { - lock (lockObject) - { - if (controlQueueNumberToNames == null) - { - controlQueueNumberToNames = new Dictionary(); - for (int controlQueueNumber = 0; controlQueueNumber < this.settings.PartitionCount; controlQueueNumber++) - { - var controlQueueName = GetControlQueueName(this.settings.TaskHubName, controlQueueNumber); - controlQueueNumberToNames[controlQueueName] = controlQueueNumber; - } - } - } - } - - return controlQueueNumberToNames; + var partitionId = index % totalPartitions; + return (uint)partitionId; } - } - - /// - public string GetControlQueueInstanceId(HashSet controlQueueNames, string instanceIdPrefix = "") - { - _ = controlQueueNames == null ? throw new ArgumentNullException(nameof(controlQueueNames)) - : controlQueueNames.Count == 0 ? - throw new ArgumentException($"{nameof(controlQueueNames)} must contain at least one element.") - : controlQueueNames.Any(x => !ControlQueueNamesToPartitionIndex.Keys.Contains(x.ToLowerInvariant())) ? - throw new ArgumentException($"{nameof(controlQueueNames)} must be valid control queue names") - : controlQueueNames; - - HashSet controlQueueNumbers = new HashSet(); - foreach (var controlQueueName in controlQueueNames) - { - controlQueueNumbers.Add(ControlQueueNamesToPartitionIndex[controlQueueName.ToLowerInvariant()]); - } - - return GetControlQueueInstanceId(controlQueueNumbers, instanceIdPrefix); - } - - /// - public string GetControlQueueInstanceId(HashSet controlQueueNumbers, string instanceIdPrefix = "") - { - _ = controlQueueNumbers == null ? throw new ArgumentNullException(nameof(controlQueueNumbers)) - : controlQueueNumbers.Count == 0 ? - throw new ArgumentException($"{nameof(controlQueueNumbers)} must contain at least one element.") - : controlQueueNumbers.Any(x => x < 0 || x >= this.settings.PartitionCount) ? - throw new ArgumentException($"{nameof(controlQueueNumbers)} must contain values in range [0, {this.settings.PartitionCount}].") - : controlQueueNumbers; - - var instanceId = string.Empty; - int suffix = 0; - bool foundInstanceId = false; - - var partitionCount = this.settings.PartitionCount; - - var randomString = Guid.NewGuid().ToString().Substring(24); - var instanceIdPrefixWithRandomness = $"{instanceIdPrefix}{randomString}"; - - // Updating suffix and checking control-queue being from provided list until found one. - while (!foundInstanceId) + else { - suffix++; - instanceId = $"{instanceIdPrefixWithRandomness}{suffix}"; - uint controlQueueNumber = GetPartitionIndex(instanceId); - - GetControlQueueName(this.settings.TaskHubName, (int)controlQueueNumber); + return Fnv1aHashHelper.ComputeHash(instanceId) % totalPartitions; - if (controlQueueNumbers.Any(x => x == controlQueueNumber)) - { - foundInstanceId = true; - } } - - return instanceId; } - #endregion IControlQueueHelper - /// /// Disposes of the current object. /// diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index 609a8b35d..1ff9e983d 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -318,5 +318,11 @@ internal LogHelper Logger /// Consumers that require separate dispatch (such as the new out-of-proc v2 SDKs) must set this to true. /// public bool UseSeparateQueueForEntityWorkItems { get; set; } = false; + + /// + /// Enabled explicit placement of instance to parition id. + /// if the instance id ends with !nnn, where nnn is an unsigned number, it indicates explicit partition placement + /// + public bool EnableExplicitPartitionPlacement { get; set; } = false; } } diff --git a/src/DurableTask.AzureStorage/ControlQueueHelpers/IControlQueueHelper.cs b/src/DurableTask.AzureStorage/ControlQueueHelpers/IControlQueueHelper.cs deleted file mode 100644 index 07e6c526d..000000000 --- a/src/DurableTask.AzureStorage/ControlQueueHelpers/IControlQueueHelper.cs +++ /dev/null @@ -1,28 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace DurableTask.AzureStorage -{ - /// - /// Provides helper functions to work with control-queues. - /// - public interface IControlQueueHelper - { - /// - /// Gets instanceId which is targeted for mentioned control-queue names. - /// - /// Collection of controlQueueNames. - /// InstanceId prefix. - /// InstanceId for control-queue. - string GetControlQueueInstanceId(HashSet controlQueueNames, string instanceIdPrefix = ""); - - /// - /// Gets instanceId which is targeted for mentioned control-queue numbers. - /// - /// Collection of controlQueueNumbers. - /// InstanceId prefix. - /// InstanceId for control-queue. - string GetControlQueueInstanceId(HashSet controlQueueNumbers, string instanceIdPrefix = ""); - } -} From e9ee9c0ab49ce49325473f744bb38cce03abb555 Mon Sep 17 00:00:00 2001 From: Pankaj Saini Date: Tue, 9 Jul 2024 21:21:05 -0700 Subject: [PATCH 4/6] Updating tests --- .../TestPartitionIndex.cs | 62 +++++++++++-------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/Test/DurableTask.AzureStorage.Tests/TestPartitionIndex.cs b/Test/DurableTask.AzureStorage.Tests/TestPartitionIndex.cs index aaf708715..bbfa90775 100644 --- a/Test/DurableTask.AzureStorage.Tests/TestPartitionIndex.cs +++ b/Test/DurableTask.AzureStorage.Tests/TestPartitionIndex.cs @@ -29,6 +29,22 @@ public class TestPartitionIndex private CancellationTokenSource cancellationTokenSource; private const string TaskHub = "taskHubName"; + private Dictionary partitionToInstanceId = new Dictionary() + { + { 0, "sampleinstanceid!13"}, + { 1, "sampleinstanceid!3"}, + { 2, "sampleinstanceid!11"}, + { 3, "sampleinstanceid!1"} + }; + + private Dictionary partitionToInstanceIdWithExplicitPartitionPlacement = new Dictionary() + { + { 0, "sampleinstanceid!0"}, + { 1, "sampleinstanceid!1"}, + { 2, "sampleinstanceid!2"}, + { 3, "sampleinstanceid!3"} + }; + [TestInitialize] public void Initialize() { @@ -53,40 +69,32 @@ public void Initialize() } [TestMethod] - [DataRow(20, false)] - [DataRow(20, true)] - public void GetPartitionIndexTest(int maxInstanceIdCount, bool enableExplicitPartitionPlacement) + public void GetPartitionIndexTest_EnableExplicitPartitionPlacement_False() { - settings.EnableExplicitPartitionPlacement = enableExplicitPartitionPlacement; + settings.EnableExplicitPartitionPlacement = false; - for (uint instanceIdSuffix = 0; instanceIdSuffix < settings.PartitionCount * 4; instanceIdSuffix++) + foreach (var kvp in partitionToInstanceId) { - Dictionary indexNumberToCount = new Dictionary(); - - for (uint indexCount = 0; indexCount < settings.PartitionCount; indexCount++) - { - indexNumberToCount[indexCount] = 0; - } - - for (int instanceCount = 0; instanceCount < maxInstanceIdCount; instanceCount++) - { - var instanceIdPrefix = Guid.NewGuid().ToString(); + var instanceId = kvp.Value; + var expectedPartitionIndex = kvp.Key; + var partitionIndex = azureStorageOrchestrationService.GetPartitionIndex(instanceId); - var instanceId = $"{instanceIdPrefix}!{instanceIdSuffix}"; + Assert.AreEqual(expectedPartitionIndex, partitionIndex); + } + } - var partitionIndex = azureStorageOrchestrationService.GetPartitionIndex(instanceId); + [TestMethod] + public void GetPartitionIndexTest_EnableExplicitPartitionPlacement_True() + { + settings.EnableExplicitPartitionPlacement = true; - indexNumberToCount[partitionIndex]++; - } + foreach (var kvp in partitionToInstanceIdWithExplicitPartitionPlacement) + { + var instanceId = kvp.Value; + var expectedPartitionIndex = kvp.Key; + var partitionIndex = azureStorageOrchestrationService.GetPartitionIndex(instanceId); - if (enableExplicitPartitionPlacement) - { - Assert.AreEqual(indexNumberToCount[(uint)(instanceIdSuffix % settings.PartitionCount)], maxInstanceIdCount); - } - else - { - Assert.AreNotEqual(indexNumberToCount[(uint)(instanceIdSuffix % settings.PartitionCount)], maxInstanceIdCount); - } + Assert.AreEqual(expectedPartitionIndex, partitionIndex); } } } From 12719fa8569b9ff2a5cf01b3276a19a8261575a8 Mon Sep 17 00:00:00 2001 From: pasaini-microsoft <77947910+pasaini-microsoft@users.noreply.github.com> Date: Tue, 9 Jul 2024 21:23:34 -0700 Subject: [PATCH 5/6] Update src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs Co-authored-by: David Justo --- .../AzureStorageOrchestrationServiceSettings.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index 1ff9e983d..04ff2ce5f 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -320,8 +320,8 @@ internal LogHelper Logger public bool UseSeparateQueueForEntityWorkItems { get; set; } = false; /// - /// Enabled explicit placement of instance to parition id. - /// if the instance id ends with !nnn, where nnn is an unsigned number, it indicates explicit partition placement + /// Whether to allow instanceIDs to use special syntax to land on a specific partition. + /// If enabled, when an instanceID ends with suffix '!nnn', where 'nnn' is an unsigned number, the instance will land on the partition/queue for to that number. /// public bool EnableExplicitPartitionPlacement { get; set; } = false; } From a07015d976c8da8abdfb817cba50d42268cd3652 Mon Sep 17 00:00:00 2001 From: Pankaj Saini Date: Mon, 29 Jul 2024 06:41:01 -0700 Subject: [PATCH 6/6] Addressing PR comments. --- .../TestPartitionIndex.cs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/Test/DurableTask.AzureStorage.Tests/TestPartitionIndex.cs b/Test/DurableTask.AzureStorage.Tests/TestPartitionIndex.cs index bbfa90775..cf2b541bc 100644 --- a/Test/DurableTask.AzureStorage.Tests/TestPartitionIndex.cs +++ b/Test/DurableTask.AzureStorage.Tests/TestPartitionIndex.cs @@ -13,9 +13,7 @@ namespace DurableTask.AzureStorage.Tests { - using System; using System.Collections.Generic; - using System.Data; using System.Threading; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -25,7 +23,6 @@ public class TestPartitionIndex private AzureStorageOrchestrationService azureStorageOrchestrationService; private AzureStorageOrchestrationServiceSettings settings; private int partitionCount = 4; - private Dictionary controlQueueNumberToNameMap; private CancellationTokenSource cancellationTokenSource; private const string TaskHub = "taskHubName"; @@ -58,14 +55,6 @@ public void Initialize() }; azureStorageOrchestrationService = new AzureStorageOrchestrationService(settings); - - controlQueueNumberToNameMap = new Dictionary(); - - for (int i = 0; i < partitionCount; i++) - { - var controlQueueName = AzureStorageOrchestrationService.GetControlQueueName(settings.TaskHubName, i); - controlQueueNumberToNameMap[controlQueueName] = i; - } } [TestMethod]