Skip to content

Commit

Permalink
Code changes to open connections in parallel. Added replica validatio…
Browse files Browse the repository at this point in the history
…n in contract.
  • Loading branch information
kundadebdatta committed Jun 20, 2023
1 parent 9b66ef7 commit 946bc4a
Show file tree
Hide file tree
Showing 17 changed files with 109 additions and 37 deletions.
13 changes: 13 additions & 0 deletions Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public ConnectionPolicy()
this.EnableReadRequestsFallback = null;
this.EnableClientTelemetry = ClientTelemetryOptions.IsClientTelemetryEnabled();
this.ServerCertificateCustomValidationCallback = null;
this.EnableReplicaValidation = false;
}

/// <summary>
Expand Down Expand Up @@ -459,6 +460,18 @@ public Func<HttpClient> HttpClientFactory
set;
}

/// <summary>
/// Gets or sets the flag to enable replica validation.
/// </summary>
/// <value>
/// The default value is false
/// </value>
public bool EnableReplicaValidation
{
get;
set;
}

/// <summary>
/// (Direct/TCP) This is an advanced setting that controls the number of TCP connections that will be opened eagerly to each Cosmos DB back-end.
/// </summary>
Expand Down
13 changes: 13 additions & 0 deletions Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,18 @@ public ConnectionMode ConnectionMode
/// <seealso cref="TransactionalBatchItemRequestOptions.EnableContentResponseOnWrite"/>
public bool? EnableContentResponseOnWrite { get; set; }

/// <summary>
/// Gets or sets the replica validation flag.
/// Enabling replica validation helps the cosmos client to become more
/// resilient to service upgrades by choosing a healthy replica over the
/// one undergoing an upgrade. The default value for this parameter is false.
/// </summary>
/// <remarks>
/// <para>This is optimal for workloads where latency spikes are critical during upgrades.</para>
/// </remarks>
/// <seealso cref="CosmosClientBuilder.WithReplicaValidation(bool)"/>
public bool EnableReplicaValidation { get; set; } = false;

/// <summary>
/// (Direct/TCP) Controls the amount of idle time after which unused connections are closed.
/// </summary>
Expand Down Expand Up @@ -758,6 +770,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId)
EnablePartitionLevelFailover = this.EnablePartitionLevelFailover,
PortReuseMode = this.portReuseMode,
EnableTcpConnectionEndpointRediscovery = this.EnableTcpConnectionEndpointRediscovery,
EnableReplicaValidation = this.EnableReplicaValidation,
HttpClientFactory = this.httpClientFactory,
ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback
};
Expand Down
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6700,7 +6700,8 @@ private void CreateStoreModel(bool subscribeRntbdStatus)
this.ConnectionPolicy.EnableReadRequestsFallback ?? (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.BoundedStaleness),
!this.enableRntbdChannel,
this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong),
true);
true,
this.ConnectionPolicy.EnableReplicaValidation);

if (subscribeRntbdStatus)
{
Expand Down
21 changes: 21 additions & 0 deletions Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,27 @@ public CosmosClientBuilder WithContentResponseOnWrite(bool contentResponseOnWrit
return this;
}

/// <summary>
/// Gets or sets the boolean to only return the headers and status code in
/// the Cosmos DB response for write item operation like Create, Upsert, Patch and Replace.
/// Setting the option to false will cause the response to have a null resource. This reduces networking and CPU load by not sending
/// the resource back over the network and serializing it on the client.
/// </summary>
/// <param name="replicaValidationEnabled">a boolean indicating whether payload will be included in the response or not.</param>
/// <remarks>
/// <para>
/// This option can be overriden by similar property in ItemRequestOptions and TransactionalBatchItemRequestOptions
/// </para>
/// </remarks>
/// <returns>The <see cref="CosmosClientBuilder"/> object</returns>
/// <seealso cref="ItemRequestOptions.EnableContentResponseOnWrite"/>
/// <seealso cref="TransactionalBatchItemRequestOptions.EnableContentResponseOnWrite"/>
public CosmosClientBuilder WithReplicaValidation(bool replicaValidationEnabled)
{
this.clientOptions.EnableReplicaValidation = replicaValidationEnabled;
return this;
}

/// <summary>
/// The event handler to be invoked before the request is sent.
/// </summary>
Expand Down
20 changes: 13 additions & 7 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public GatewayAddressCache(
CosmosHttpClient httpClient,
IOpenConnectionsHandler openConnectionsHandler,
long suboptimalPartitionForceRefreshIntervalInSeconds = 600,
bool enableTcpConnectionEndpointRediscovery = false)
bool enableTcpConnectionEndpointRediscovery = false,
bool replicaAddressValidationEnabled = false)
{
this.addressEndpoint = new Uri(serviceEndpoint + "/" + Paths.AddressPathSegment);
this.protocol = protocol;
Expand All @@ -85,9 +86,7 @@ public GatewayAddressCache(
GatewayAddressCache.ProtocolString(this.protocol));

this.openConnectionsHandler = openConnectionsHandler;
this.isReplicaAddressValidationEnabled = Helpers.GetEnvironmentVariable<bool>(
name: Constants.EnvironmentVariables.ReplicaConnectivityValidationEnabled,
defaultValue: false);
this.isReplicaAddressValidationEnabled = replicaAddressValidationEnabled;
}

public Uri ServiceEndpoint => this.serviceEndpoint;
Expand Down Expand Up @@ -173,6 +172,7 @@ public async Task OpenConnectionsAsync(
.GroupBy(address => address.PartitionKeyRangeId, StringComparer.Ordinal)
.Select(group => this.ToPartitionAddressAndRange(collection.ResourceId, @group.ToList(), inNetworkRequest));

List<Task> openConnectionTasks = new ();
foreach (Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> addressInfo in addressInfos)
{
this.serverPartitionAddressCache.Set(
Expand All @@ -185,11 +185,17 @@ public async Task OpenConnectionsAsync(
// other flow, the flag should be passed as `false`.
if (this.openConnectionsHandler != null && shouldOpenRntbdChannels)
{
await this.openConnectionsHandler
.TryOpenRntbdChannelsAsync(
addresses: addressInfo.Item2.Get(Protocol.Tcp)?.ReplicaTransportAddressUris);
openConnectionTasks
.Add(this.openConnectionsHandler
.TryOpenRntbdChannelsAsync(
addresses: addressInfo.Item2.Get(Protocol.Tcp)?.ReplicaTransportAddressUris));
}
}

if (openConnectionTasks.Any())
{
await Task.WhenAll(openConnectionTasks);
}
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDispos
private readonly CosmosHttpClient httpClient;
private readonly ConcurrentDictionary<Uri, EndpointCache> addressCacheByEndpoint;
private readonly bool enableTcpConnectionEndpointRediscovery;
private readonly bool replicaAddressValidationEnabled;
private IOpenConnectionsHandler openConnectionsHandler;

public GlobalAddressResolver(
Expand Down Expand Up @@ -66,6 +67,7 @@ public GlobalAddressResolver(
? GlobalAddressResolver.MaxBackupReadRegions : 0;

this.enableTcpConnectionEndpointRediscovery = connectionPolicy.EnableTcpConnectionEndpointRediscovery;
this.replicaAddressValidationEnabled = connectionPolicy.EnableReplicaValidation;

this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover)

Expand Down Expand Up @@ -281,7 +283,8 @@ private EndpointCache GetOrAddEndpoint(Uri endpoint)
this.serviceConfigReader,
this.httpClient,
this.openConnectionsHandler,
enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery);
enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery,
replicaAddressValidationEnabled: this.replicaAddressValidationEnabled);
string location = this.endpointManager.GetLocation(endpoint);
AddressResolver addressResolver = new AddressResolver(null, new NullRequestSigner(), location);
Expand Down
5 changes: 3 additions & 2 deletions Microsoft.Azure.Cosmos/src/direct/ConsistencyReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,13 @@ public ConsistencyReader(
ISessionContainer sessionContainer,
TransportClient transportClient,
IServiceConfigurationReader serviceConfigReader,
IAuthorizationTokenProvider authorizationTokenProvider)
IAuthorizationTokenProvider authorizationTokenProvider,
bool enableReplicaValidation)
{
this.addressSelector = addressSelector;
this.serviceConfigReader = serviceConfigReader;
this.authorizationTokenProvider = authorizationTokenProvider;
this.storeReader = new StoreReader(transportClient, addressSelector, new AddressEnumerator(), sessionContainer);
this.storeReader = new StoreReader(transportClient, addressSelector, new AddressEnumerator(), sessionContainer, enableReplicaValidation);
this.quorumReader = new QuorumReader(transportClient, addressSelector, this.storeReader, serviceConfigReader, authorizationTokenProvider);
}

Expand Down
6 changes: 4 additions & 2 deletions Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public ConsistencyWriter(
TransportClient transportClient,
IServiceConfigurationReader serviceConfigReader,
IAuthorizationTokenProvider authorizationTokenProvider,
bool useMultipleWriteLocations)
bool useMultipleWriteLocations,
bool enableReplicaValidation)
{
this.transportClient = transportClient;
this.addressSelector = addressSelector;
Expand All @@ -71,7 +72,8 @@ public ConsistencyWriter(
transportClient,
addressSelector,
new AddressEnumerator(),
sessionContainer: null); //we need store reader only for global strong, no session is needed*/
sessionContainer: null,
enableReplicaValidation); //we need store reader only for global strong, no session is needed*/
}

// Test hook
Expand Down
1 change: 0 additions & 1 deletion Microsoft.Azure.Cosmos/src/direct/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2134,7 +2134,6 @@ public static class EnvironmentVariables
{
public const string SocketOptionTcpKeepAliveIntervalName = "AZURE_COSMOS_TCP_KEEPALIVE_INTERVAL_SECONDS";
public const string SocketOptionTcpKeepAliveTimeName = "AZURE_COSMOS_TCP_KEEPALIVE_TIME_SECONDS";
public const string ReplicaConnectivityValidationEnabled = "AZURE_COSMOS_REPLICA_VALIDATION_ENABLED";
public const string AggressiveTimeoutDetectionEnabled = "AZURE_COSMOS_AGGRESSIVE_TIMEOUT_DETECTION_ENABLED";
public const string TimeoutDetectionTimeLimit = "AZURE_COSMOS_TIMEOUT_DETECTION_TIME_LIMIT_IN_SECONDS";
public const string TimeoutDetectionOnWriteThreshold = "AZURE_COSMOS_TIMEOUT_DETECTION_ON_WRITE_THRESHOLD";
Expand Down
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/direct/IStoreClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ StoreClient CreateStoreClient(
bool enableReadRequestsFallback = false,
bool useFallbackClient = true,
bool useMultipleWriteLocations = false,
bool detectClientConnectivityIssues = false);
bool detectClientConnectivityIssues = false,
bool enableReplicaValidation = false);
}
}
7 changes: 5 additions & 2 deletions Microsoft.Azure.Cosmos/src/direct/ReplicatedResourceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public ReplicatedResourceClient(
bool useMultipleWriteLocations,
bool detectClientConnectivityIssues,
bool disableRetryWithRetryPolicy,
bool enableReplicaValidation,
RetryWithConfiguration retryWithConfiguration = null)
{
this.addressResolver = addressResolver;
Expand All @@ -86,14 +87,16 @@ public ReplicatedResourceClient(
sessionContainer,
transportClient,
serviceConfigReader,
authorizationTokenProvider);
authorizationTokenProvider,
enableReplicaValidation);
this.consistencyWriter = new ConsistencyWriter(
this.addressSelector,
sessionContainer,
transportClient,
serviceConfigReader,
authorizationTokenProvider,
useMultipleWriteLocations);
useMultipleWriteLocations,
enableReplicaValidation);
this.enableReadRequestsFallback = enableReadRequestsFallback;
this.useMultipleWriteLocations = useMultipleWriteLocations;
this.detectClientConnectivityIssues = detectClientConnectivityIssues;
Expand Down
4 changes: 3 additions & 1 deletion Microsoft.Azure.Cosmos/src/direct/StoreClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public StoreClient(
bool useMultipleWriteLocations = false,
bool detectClientConnectivityIssues = false,
bool disableRetryWithRetryPolicy = false,
bool enableReplicaValidation = false,
RetryWithConfiguration retryWithConfiguration = null)
{
this.transportClient = transportClient;
Expand Down Expand Up @@ -68,7 +69,8 @@ public StoreClient(
useMultipleWriteLocations: useMultipleWriteLocations,
detectClientConnectivityIssues: detectClientConnectivityIssues,
disableRetryWithRetryPolicy: disableRetryWithRetryPolicy,
retryWithConfiguration: retryWithConfiguration);
retryWithConfiguration: retryWithConfiguration,
enableReplicaValidation: enableReplicaValidation);
}

internal JsonSerializerSettings SerializerSettings { get; set; }
Expand Down
9 changes: 6 additions & 3 deletions Microsoft.Azure.Cosmos/src/direct/StoreClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ public StoreClient CreateStoreClient(
bool enableReadRequestsFallback = false,
bool useFallbackClient = true,
bool useMultipleWriteLocations = false,
bool detectClientConnectivityIssues = false)
bool detectClientConnectivityIssues = false,
bool enableReplicaValidation = false)
{
this.ThrowIfDisposed();
if (useFallbackClient && this.fallbackTransportClient != null)
Expand All @@ -326,7 +327,8 @@ public StoreClient CreateStoreClient(
useMultipleWriteLocations: useMultipleWriteLocations,
detectClientConnectivityIssues: detectClientConnectivityIssues,
disableRetryWithRetryPolicy: this.disableRetryWithRetryPolicy,
retryWithConfiguration: this.retryWithConfiguration);
retryWithConfiguration: this.retryWithConfiguration,
enableReplicaValidation: enableReplicaValidation);
}

return new StoreClient(
Expand All @@ -341,7 +343,8 @@ public StoreClient CreateStoreClient(
useMultipleWriteLocations: useMultipleWriteLocations,
detectClientConnectivityIssues: detectClientConnectivityIssues,
disableRetryWithRetryPolicy: this.disableRetryWithRetryPolicy,
retryWithConfiguration: this.retryWithConfiguration);
retryWithConfiguration: this.retryWithConfiguration,
enableReplicaValidation: enableReplicaValidation);
}

#region IDisposable
Expand Down
7 changes: 3 additions & 4 deletions Microsoft.Azure.Cosmos/src/direct/StoreReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ public StoreReader(
TransportClient transportClient,
AddressSelector addressSelector,
IAddressEnumerator addressEnumerator,
ISessionContainer sessionContainer)
ISessionContainer sessionContainer,
bool enableReplicaValidation)
{
this.transportClient = transportClient;
this.addressSelector = addressSelector;
this.addressEnumerator = addressEnumerator ?? throw new ArgumentNullException(nameof(addressEnumerator));
this.sessionContainer = sessionContainer;
this.canUseLocalLSNBasedHeaders = VersionUtility.IsLaterThan(HttpConstants.Versions.CurrentVersion, HttpConstants.Versions.v2018_06_18);
this.isReplicaAddressValidationEnabled = Helpers.GetEnvironmentVariable(
name: Constants.EnvironmentVariables.ReplicaConnectivityValidationEnabled,
defaultValue: false);
this.isReplicaAddressValidationEnabled = enableReplicaValidation;
}

// Test hook
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ private async Task ValidateConnectTimeoutTriggersClientRetryPolicy(
enableReadRequestsFallback: false,
useMultipleWriteLocations: useMultipleWriteLocations,
detectClientConnectivityIssues: true,
disableRetryWithRetryPolicy: false);
disableRetryWithRetryPolicy: false,
enableReplicaValidation: false);

// Reducing retry timeout to avoid long-running tests
replicatedResourceClient.GoneAndRetryWithRetryTimeoutInSecondsOverride = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private StoreClient GetMockStoreClient()
TransportClient mockTransportClient = this.GetMockTransportClient();
ISessionContainer sessionContainer = new SessionContainer(string.Empty);

StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer);
StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false);

Mock<IAuthorizationTokenProvider> mockAuthorizationTokenProvider = new Mock<IAuthorizationTokenProvider>();
mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync(
Expand Down
Loading

0 comments on commit 946bc4a

Please sign in to comment.