From 946bc4a9c7ae8eb90e773050eb5b7e78f23b9f0d Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Tue, 20 Jun 2023 16:02:04 -0700 Subject: [PATCH] Code changes to open connections in parallel. Added replica validation in contract. --- .../src/ConnectionPolicy.cs | 13 ++++++++++ .../src/CosmosClientOptions.cs | 13 ++++++++++ Microsoft.Azure.Cosmos/src/DocumentClient.cs | 3 ++- .../src/Fluent/CosmosClientBuilder.cs | 21 ++++++++++++++++ .../src/Routing/GatewayAddressCache.cs | 20 ++++++++++------ .../src/Routing/GlobalAddressResolver.cs | 5 +++- .../src/direct/ConsistencyReader.cs | 5 ++-- .../src/direct/ConsistencyWriter.cs | 6 +++-- .../src/direct/Constants.cs | 1 - .../src/direct/IStoreClientFactory.cs | 3 ++- .../src/direct/ReplicatedResourceClient.cs | 7 ++++-- .../src/direct/StoreClient.cs | 4 +++- .../src/direct/StoreClientFactory.cs | 9 ++++--- .../src/direct/StoreReader.cs | 7 +++--- .../ClientRetryPolicyTests.cs | 3 ++- .../RequestEventHandlerTests.cs | 2 +- .../StoreReaderTest.cs | 24 +++++++++++-------- 17 files changed, 109 insertions(+), 37 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs b/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs index 7abfd76deb..341de350d9 100644 --- a/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs +++ b/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs @@ -50,6 +50,7 @@ public ConnectionPolicy() this.EnableReadRequestsFallback = null; this.EnableClientTelemetry = ClientTelemetryOptions.IsClientTelemetryEnabled(); this.ServerCertificateCustomValidationCallback = null; + this.EnableReplicaValidation = false; } /// @@ -459,6 +460,18 @@ public Func HttpClientFactory set; } + /// + /// Gets or sets the flag to enable replica validation. + /// + /// + /// The default value is false + /// + public bool EnableReplicaValidation + { + get; + set; + } + /// /// (Direct/TCP) This is an advanced setting that controls the number of TCP connections that will be opened eagerly to each Cosmos DB back-end. /// diff --git a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs index 1fce195915..436100dc72 100644 --- a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs +++ b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs @@ -346,6 +346,18 @@ public ConnectionMode ConnectionMode /// public bool? EnableContentResponseOnWrite { get; set; } + /// + /// Gets or sets the replica validation flag. + /// Enabling replica validation helps the cosmos client to become more + /// resilient to service upgrades by choosing a healthy replica over the + /// one undergoing an upgrade. The default value for this parameter is false. + /// + /// + /// This is optimal for workloads where latency spikes are critical during upgrades. + /// + /// + public bool EnableReplicaValidation { get; set; } = false; + /// /// (Direct/TCP) Controls the amount of idle time after which unused connections are closed. /// @@ -758,6 +770,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId) EnablePartitionLevelFailover = this.EnablePartitionLevelFailover, PortReuseMode = this.portReuseMode, EnableTcpConnectionEndpointRediscovery = this.EnableTcpConnectionEndpointRediscovery, + EnableReplicaValidation = this.EnableReplicaValidation, HttpClientFactory = this.httpClientFactory, ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback }; diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index 068bb5b857..015959ba50 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -6700,7 +6700,8 @@ private void CreateStoreModel(bool subscribeRntbdStatus) this.ConnectionPolicy.EnableReadRequestsFallback ?? (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.BoundedStaleness), !this.enableRntbdChannel, this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong), - true); + true, + this.ConnectionPolicy.EnableReplicaValidation); if (subscribeRntbdStatus) { diff --git a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs index 8b72bfffa4..c7c05aae68 100644 --- a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs @@ -621,6 +621,27 @@ public CosmosClientBuilder WithContentResponseOnWrite(bool contentResponseOnWrit return this; } + /// + /// Gets or sets the boolean to only return the headers and status code in + /// the Cosmos DB response for write item operation like Create, Upsert, Patch and Replace. + /// Setting the option to false will cause the response to have a null resource. This reduces networking and CPU load by not sending + /// the resource back over the network and serializing it on the client. + /// + /// a boolean indicating whether payload will be included in the response or not. + /// + /// + /// This option can be overriden by similar property in ItemRequestOptions and TransactionalBatchItemRequestOptions + /// + /// + /// The object + /// + /// + public CosmosClientBuilder WithReplicaValidation(bool replicaValidationEnabled) + { + this.clientOptions.EnableReplicaValidation = replicaValidationEnabled; + return this; + } + /// /// The event handler to be invoked before the request is sent. /// diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index 14b641959a..068e79a679 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -61,7 +61,8 @@ public GatewayAddressCache( CosmosHttpClient httpClient, IOpenConnectionsHandler openConnectionsHandler, long suboptimalPartitionForceRefreshIntervalInSeconds = 600, - bool enableTcpConnectionEndpointRediscovery = false) + bool enableTcpConnectionEndpointRediscovery = false, + bool replicaAddressValidationEnabled = false) { this.addressEndpoint = new Uri(serviceEndpoint + "/" + Paths.AddressPathSegment); this.protocol = protocol; @@ -85,9 +86,7 @@ public GatewayAddressCache( GatewayAddressCache.ProtocolString(this.protocol)); this.openConnectionsHandler = openConnectionsHandler; - this.isReplicaAddressValidationEnabled = Helpers.GetEnvironmentVariable( - name: Constants.EnvironmentVariables.ReplicaConnectivityValidationEnabled, - defaultValue: false); + this.isReplicaAddressValidationEnabled = replicaAddressValidationEnabled; } public Uri ServiceEndpoint => this.serviceEndpoint; @@ -173,6 +172,7 @@ public async Task OpenConnectionsAsync( .GroupBy(address => address.PartitionKeyRangeId, StringComparer.Ordinal) .Select(group => this.ToPartitionAddressAndRange(collection.ResourceId, @group.ToList(), inNetworkRequest)); + List openConnectionTasks = new (); foreach (Tuple addressInfo in addressInfos) { this.serverPartitionAddressCache.Set( @@ -185,11 +185,17 @@ public async Task OpenConnectionsAsync( // other flow, the flag should be passed as `false`. if (this.openConnectionsHandler != null && shouldOpenRntbdChannels) { - await this.openConnectionsHandler - .TryOpenRntbdChannelsAsync( - addresses: addressInfo.Item2.Get(Protocol.Tcp)?.ReplicaTransportAddressUris); + openConnectionTasks + .Add(this.openConnectionsHandler + .TryOpenRntbdChannelsAsync( + addresses: addressInfo.Item2.Get(Protocol.Tcp)?.ReplicaTransportAddressUris)); } } + + if (openConnectionTasks.Any()) + { + await Task.WhenAll(openConnectionTasks); + } } } } diff --git a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs index 344f994395..4e4118965b 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs @@ -39,6 +39,7 @@ internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDispos private readonly CosmosHttpClient httpClient; private readonly ConcurrentDictionary addressCacheByEndpoint; private readonly bool enableTcpConnectionEndpointRediscovery; + private readonly bool replicaAddressValidationEnabled; private IOpenConnectionsHandler openConnectionsHandler; public GlobalAddressResolver( @@ -66,6 +67,7 @@ public GlobalAddressResolver( ? GlobalAddressResolver.MaxBackupReadRegions : 0; this.enableTcpConnectionEndpointRediscovery = connectionPolicy.EnableTcpConnectionEndpointRediscovery; + this.replicaAddressValidationEnabled = connectionPolicy.EnableReplicaValidation; this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover) @@ -281,7 +283,8 @@ private EndpointCache GetOrAddEndpoint(Uri endpoint) this.serviceConfigReader, this.httpClient, this.openConnectionsHandler, - enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery); + enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery, + replicaAddressValidationEnabled: this.replicaAddressValidationEnabled); string location = this.endpointManager.GetLocation(endpoint); AddressResolver addressResolver = new AddressResolver(null, new NullRequestSigner(), location); diff --git a/Microsoft.Azure.Cosmos/src/direct/ConsistencyReader.cs b/Microsoft.Azure.Cosmos/src/direct/ConsistencyReader.cs index fd92ee456c..e9db63f927 100644 --- a/Microsoft.Azure.Cosmos/src/direct/ConsistencyReader.cs +++ b/Microsoft.Azure.Cosmos/src/direct/ConsistencyReader.cs @@ -144,12 +144,13 @@ public ConsistencyReader( ISessionContainer sessionContainer, TransportClient transportClient, IServiceConfigurationReader serviceConfigReader, - IAuthorizationTokenProvider authorizationTokenProvider) + IAuthorizationTokenProvider authorizationTokenProvider, + bool enableReplicaValidation) { this.addressSelector = addressSelector; this.serviceConfigReader = serviceConfigReader; this.authorizationTokenProvider = authorizationTokenProvider; - this.storeReader = new StoreReader(transportClient, addressSelector, new AddressEnumerator(), sessionContainer); + this.storeReader = new StoreReader(transportClient, addressSelector, new AddressEnumerator(), sessionContainer, enableReplicaValidation); this.quorumReader = new QuorumReader(transportClient, addressSelector, this.storeReader, serviceConfigReader, authorizationTokenProvider); } diff --git a/Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs b/Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs index 2f10eafe82..e614b51133 100644 --- a/Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs +++ b/Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs @@ -59,7 +59,8 @@ public ConsistencyWriter( TransportClient transportClient, IServiceConfigurationReader serviceConfigReader, IAuthorizationTokenProvider authorizationTokenProvider, - bool useMultipleWriteLocations) + bool useMultipleWriteLocations, + bool enableReplicaValidation) { this.transportClient = transportClient; this.addressSelector = addressSelector; @@ -71,7 +72,8 @@ public ConsistencyWriter( transportClient, addressSelector, new AddressEnumerator(), - sessionContainer: null); //we need store reader only for global strong, no session is needed*/ + sessionContainer: null, + enableReplicaValidation); //we need store reader only for global strong, no session is needed*/ } // Test hook diff --git a/Microsoft.Azure.Cosmos/src/direct/Constants.cs b/Microsoft.Azure.Cosmos/src/direct/Constants.cs index 68a5c15afe..5e39d91a09 100644 --- a/Microsoft.Azure.Cosmos/src/direct/Constants.cs +++ b/Microsoft.Azure.Cosmos/src/direct/Constants.cs @@ -2134,7 +2134,6 @@ public static class EnvironmentVariables { public const string SocketOptionTcpKeepAliveIntervalName = "AZURE_COSMOS_TCP_KEEPALIVE_INTERVAL_SECONDS"; public const string SocketOptionTcpKeepAliveTimeName = "AZURE_COSMOS_TCP_KEEPALIVE_TIME_SECONDS"; - public const string ReplicaConnectivityValidationEnabled = "AZURE_COSMOS_REPLICA_VALIDATION_ENABLED"; public const string AggressiveTimeoutDetectionEnabled = "AZURE_COSMOS_AGGRESSIVE_TIMEOUT_DETECTION_ENABLED"; public const string TimeoutDetectionTimeLimit = "AZURE_COSMOS_TIMEOUT_DETECTION_TIME_LIMIT_IN_SECONDS"; public const string TimeoutDetectionOnWriteThreshold = "AZURE_COSMOS_TIMEOUT_DETECTION_ON_WRITE_THRESHOLD"; diff --git a/Microsoft.Azure.Cosmos/src/direct/IStoreClientFactory.cs b/Microsoft.Azure.Cosmos/src/direct/IStoreClientFactory.cs index 8dd2180d12..0ccb0f525a 100644 --- a/Microsoft.Azure.Cosmos/src/direct/IStoreClientFactory.cs +++ b/Microsoft.Azure.Cosmos/src/direct/IStoreClientFactory.cs @@ -17,6 +17,7 @@ StoreClient CreateStoreClient( bool enableReadRequestsFallback = false, bool useFallbackClient = true, bool useMultipleWriteLocations = false, - bool detectClientConnectivityIssues = false); + bool detectClientConnectivityIssues = false, + bool enableReplicaValidation = false); } } diff --git a/Microsoft.Azure.Cosmos/src/direct/ReplicatedResourceClient.cs b/Microsoft.Azure.Cosmos/src/direct/ReplicatedResourceClient.cs index f3b17e7fc6..1489231323 100644 --- a/Microsoft.Azure.Cosmos/src/direct/ReplicatedResourceClient.cs +++ b/Microsoft.Azure.Cosmos/src/direct/ReplicatedResourceClient.cs @@ -68,6 +68,7 @@ public ReplicatedResourceClient( bool useMultipleWriteLocations, bool detectClientConnectivityIssues, bool disableRetryWithRetryPolicy, + bool enableReplicaValidation, RetryWithConfiguration retryWithConfiguration = null) { this.addressResolver = addressResolver; @@ -86,14 +87,16 @@ public ReplicatedResourceClient( sessionContainer, transportClient, serviceConfigReader, - authorizationTokenProvider); + authorizationTokenProvider, + enableReplicaValidation); this.consistencyWriter = new ConsistencyWriter( this.addressSelector, sessionContainer, transportClient, serviceConfigReader, authorizationTokenProvider, - useMultipleWriteLocations); + useMultipleWriteLocations, + enableReplicaValidation); this.enableReadRequestsFallback = enableReadRequestsFallback; this.useMultipleWriteLocations = useMultipleWriteLocations; this.detectClientConnectivityIssues = detectClientConnectivityIssues; diff --git a/Microsoft.Azure.Cosmos/src/direct/StoreClient.cs b/Microsoft.Azure.Cosmos/src/direct/StoreClient.cs index fb390a314c..6610ebe338 100644 --- a/Microsoft.Azure.Cosmos/src/direct/StoreClient.cs +++ b/Microsoft.Azure.Cosmos/src/direct/StoreClient.cs @@ -41,6 +41,7 @@ public StoreClient( bool useMultipleWriteLocations = false, bool detectClientConnectivityIssues = false, bool disableRetryWithRetryPolicy = false, + bool enableReplicaValidation = false, RetryWithConfiguration retryWithConfiguration = null) { this.transportClient = transportClient; @@ -68,7 +69,8 @@ public StoreClient( useMultipleWriteLocations: useMultipleWriteLocations, detectClientConnectivityIssues: detectClientConnectivityIssues, disableRetryWithRetryPolicy: disableRetryWithRetryPolicy, - retryWithConfiguration: retryWithConfiguration); + retryWithConfiguration: retryWithConfiguration, + enableReplicaValidation: enableReplicaValidation); } internal JsonSerializerSettings SerializerSettings { get; set; } diff --git a/Microsoft.Azure.Cosmos/src/direct/StoreClientFactory.cs b/Microsoft.Azure.Cosmos/src/direct/StoreClientFactory.cs index ae790bb843..dc206ad642 100644 --- a/Microsoft.Azure.Cosmos/src/direct/StoreClientFactory.cs +++ b/Microsoft.Azure.Cosmos/src/direct/StoreClientFactory.cs @@ -308,7 +308,8 @@ public StoreClient CreateStoreClient( bool enableReadRequestsFallback = false, bool useFallbackClient = true, bool useMultipleWriteLocations = false, - bool detectClientConnectivityIssues = false) + bool detectClientConnectivityIssues = false, + bool enableReplicaValidation = false) { this.ThrowIfDisposed(); if (useFallbackClient && this.fallbackTransportClient != null) @@ -326,7 +327,8 @@ public StoreClient CreateStoreClient( useMultipleWriteLocations: useMultipleWriteLocations, detectClientConnectivityIssues: detectClientConnectivityIssues, disableRetryWithRetryPolicy: this.disableRetryWithRetryPolicy, - retryWithConfiguration: this.retryWithConfiguration); + retryWithConfiguration: this.retryWithConfiguration, + enableReplicaValidation: enableReplicaValidation); } return new StoreClient( @@ -341,7 +343,8 @@ public StoreClient CreateStoreClient( useMultipleWriteLocations: useMultipleWriteLocations, detectClientConnectivityIssues: detectClientConnectivityIssues, disableRetryWithRetryPolicy: this.disableRetryWithRetryPolicy, - retryWithConfiguration: this.retryWithConfiguration); + retryWithConfiguration: this.retryWithConfiguration, + enableReplicaValidation: enableReplicaValidation); } #region IDisposable diff --git a/Microsoft.Azure.Cosmos/src/direct/StoreReader.cs b/Microsoft.Azure.Cosmos/src/direct/StoreReader.cs index 6ef9b0a4c9..962be405c8 100644 --- a/Microsoft.Azure.Cosmos/src/direct/StoreReader.cs +++ b/Microsoft.Azure.Cosmos/src/direct/StoreReader.cs @@ -25,16 +25,15 @@ public StoreReader( TransportClient transportClient, AddressSelector addressSelector, IAddressEnumerator addressEnumerator, - ISessionContainer sessionContainer) + ISessionContainer sessionContainer, + bool enableReplicaValidation) { this.transportClient = transportClient; this.addressSelector = addressSelector; this.addressEnumerator = addressEnumerator ?? throw new ArgumentNullException(nameof(addressEnumerator)); this.sessionContainer = sessionContainer; this.canUseLocalLSNBasedHeaders = VersionUtility.IsLaterThan(HttpConstants.Versions.CurrentVersion, HttpConstants.Versions.v2018_06_18); - this.isReplicaAddressValidationEnabled = Helpers.GetEnvironmentVariable( - name: Constants.EnvironmentVariables.ReplicaConnectivityValidationEnabled, - defaultValue: false); + this.isReplicaAddressValidationEnabled = enableReplicaValidation; } // Test hook diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs index a41f08afa1..0851952f40 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs @@ -209,7 +209,8 @@ private async Task ValidateConnectTimeoutTriggersClientRetryPolicy( enableReadRequestsFallback: false, useMultipleWriteLocations: useMultipleWriteLocations, detectClientConnectivityIssues: true, - disableRetryWithRetryPolicy: false); + disableRetryWithRetryPolicy: false, + enableReplicaValidation: false); // Reducing retry timeout to avoid long-running tests replicatedResourceClient.GoneAndRetryWithRetryTimeoutInSecondsOverride = 1; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs index f5602a1f29..9a19fb4a90 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs @@ -82,7 +82,7 @@ private StoreClient GetMockStoreClient() TransportClient mockTransportClient = this.GetMockTransportClient(); ISessionContainer sessionContainer = new SessionContainer(string.Empty); - StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer); + StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs index 609817f3de..ac9d3f95f2 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs @@ -536,7 +536,8 @@ public void StoreReaderBarrierTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + enableReplicaValidation: false); // reads always go to read quorum (2) replicas int replicaCountToRead = 2; @@ -611,14 +612,14 @@ public void GlobalStrongConsistentWriteMockTest() for (int i = 0; i < addressInformation.Length; i++) { TransportClient mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, false, false); - StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer); - ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false); + ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); StoreResponse response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); //globalCommittedLsn never catches up in this case mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, true, false, false); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); try { response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; @@ -629,17 +630,17 @@ public void GlobalStrongConsistentWriteMockTest() } mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, true, false); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, true, true); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, false, true); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); } @@ -703,7 +704,8 @@ public void GlobalStrongConsistencyMockTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( @@ -746,7 +748,8 @@ public void GlobalStrongConsistencyMockTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( @@ -798,7 +801,8 @@ public void GlobalStrongConsistencyMockTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync(