From 6dc8ff9e66268d2891e73e48911464950d9c47c2 Mon Sep 17 00:00:00 2001 From: Steve Gordon Date: Mon, 25 Nov 2024 15:45:07 +0000 Subject: [PATCH] Ensure we track retry attempts per request (#146) As the pipeline is now created once per transport, we need to track the retires outside of the pipeline. It also fixes two other bugs where we don't null check correctly. Closes #145 --- .../Components/Pipeline/RequestPipeline.cs | 17 ++-- .../TransportClient/HttpRequestInvoker.cs | 18 +--- .../TransportClient/HttpWebRequestInvoker.cs | 11 +-- .../TransportClient/RequestInvokerHelpers.cs | 24 ++++++ src/Elastic.Transport/DistributedTransport.cs | 7 +- .../NodePool/StaticNodePoolTests.cs | 60 ++++++++++++++ .../TransportClient/RequestInvokerTests.cs | 83 +++++++++++++++++++ 7 files changed, 181 insertions(+), 39 deletions(-) create mode 100644 src/Elastic.Transport/Components/TransportClient/RequestInvokerHelpers.cs create mode 100644 tests/Elastic.Transport.Tests/Components/NodePool/StaticNodePoolTests.cs create mode 100644 tests/Elastic.Transport.Tests/Components/TransportClient/RequestInvokerTests.cs diff --git a/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs b/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs index d981df6..a3b73b0 100644 --- a/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs +++ b/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs @@ -64,7 +64,7 @@ private RequestConfiguration PingAndSniffRequestConfiguration } } - private bool DepletedRetries(DateTimeOffset startedOn) => Retried >= MaxRetries + 1 || IsTakingTooLong(startedOn); + private bool DepletedRetries(DateTimeOffset startedOn, int attemptedNodes) => attemptedNodes >= MaxRetries + 1 || IsTakingTooLong(startedOn); private bool FirstPoolUsageNeedsSniffing => !RequestDisabledSniff @@ -87,8 +87,6 @@ private bool IsTakingTooLong(DateTimeOffset startedOn) private bool Refresh { get; set; } - private int Retried { get; set; } - private IEnumerable SniffNodes(Auditor? auditor) => _nodePool .CreateView(auditor) .ToList() @@ -204,6 +202,7 @@ private async ValueTask CallProductEndpointCoreAsync( Endpoint endpoint, Auditor? auditor, DateTimeOffset startedOn, + int attemptedNodes, List? seenExceptions ) where TResponse : TransportResponse, new() @@ -231,7 +230,7 @@ private async ValueTask CallProductEndpointCoreAsync( auditor?.Emit(MaxTimeoutReached); exceptionMessage = "Maximum timeout reached while retrying request"; } - else if (Retried >= MaxRetries && MaxRetries > 0) + else if (attemptedNodes >= MaxRetries && MaxRetries > 0) { pipelineFailure = PipelineFailure.MaxRetriesReached; auditor?.Emit(MaxRetriesReached); @@ -239,7 +238,7 @@ private async ValueTask CallProductEndpointCoreAsync( var now = _dateTimeProvider.Now(); var activeNodes = _nodePool.Nodes.Count(n => n.IsAlive || n.DeadUntil <= now); - if (Retried >= activeNodes) + if (attemptedNodes >= activeNodes) { auditor?.Emit(FailedOverAllNodes); exceptionMessage += ", failed over to all the known alive nodes before failing"; @@ -336,7 +335,6 @@ public void MarkDead(Node node) { var deadUntil = _dateTimeProvider.DeadTime(node.FailedAttempts, _settings.DeadTimeout, _settings.MaxDeadTimeout); node.MarkDead(deadUntil); - Retried++; } /// Fast path for if only a single node could ever be yielded this save an IEnumerator allocation @@ -355,12 +353,11 @@ public bool TryGetSingleNode(out Node? node) } /// returns a consistent enumerable view into the available nodes - public IEnumerable NextNode(DateTimeOffset startedOn, Auditor? auditor) + public IEnumerable NextNode(DateTimeOffset startedOn, int attemptedNodes, Auditor? auditor) { if (_boundConfiguration.ForceNode != null) { yield return new Node(_boundConfiguration.ForceNode); - yield break; } @@ -370,11 +367,11 @@ public IEnumerable NextNode(DateTimeOffset startedOn, Auditor? auditor) var refreshed = false; for (var i = 0; i < 100; i++) { - if (DepletedRetries(startedOn)) yield break; + if (DepletedRetries(startedOn, attemptedNodes)) yield break; foreach (var node in _nodePool.CreateView(auditor)) { - if (DepletedRetries(startedOn)) break; + if (DepletedRetries(startedOn, attemptedNodes)) break; if (!_nodePredicate(node)) continue; diff --git a/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs b/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs index fb743c1..779267f 100644 --- a/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs +++ b/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs @@ -51,13 +51,6 @@ internal HttpRequestInvoker(ResponseFactory responseFactory) public HttpRequestInvoker(Func wrappingHandler) : this(wrappingHandler, new DefaultResponseFactory()) { } - /// - /// Allows consumers to inject their own HttpMessageHandler, and optionally call our default implementation. - /// - public HttpRequestInvoker(Func wrappingHandler, ITransportConfiguration transportConfiguration) : - this(wrappingHandler, new DefaultResponseFactory()) - { } - internal HttpRequestInvoker(Func wrappingHandler, ResponseFactory responseFactory) { ResponseFactory = responseFactory; @@ -203,16 +196,7 @@ private async ValueTask RequestCoreAsync(bool isAsync, End receivedResponse?.Dispose(); } - if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false))) - return response; - - var attributes = boundConfiguration.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails); - - if (attributes is null) return response; - - foreach (var attribute in attributes) - Activity.Current?.SetTag(attribute.Key, attribute.Value); - + RequestInvokerHelpers.SetOtelAttributes(boundConfiguration, response); return response; } catch diff --git a/src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs b/src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs index c20315c..c2f99bf 100644 --- a/src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs +++ b/src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs @@ -194,14 +194,7 @@ private async ValueTask RequestCoreAsync(bool isAsync, End receivedResponse?.Dispose(); } - if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false)) - { - var attributes = boundConfiguration.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails); - foreach (var attribute in attributes) - { - Activity.Current?.SetTag(attribute.Key, attribute.Value); - } - } + RequestInvokerHelpers.SetOtelAttributes(boundConfiguration, response); return response; } @@ -421,7 +414,7 @@ protected virtual void SetProxyIfNeeded(HttpWebRequest request, BoundConfigurati protected virtual void SetAuthenticationIfNeeded(Endpoint endpoint, BoundConfiguration boundConfiguration, HttpWebRequest request) { //If user manually specifies an Authorization Header give it preference - if (boundConfiguration.Headers.HasKeys() && boundConfiguration.Headers.AllKeys.Contains("Authorization")) + if (boundConfiguration.Headers is not null && boundConfiguration.Headers.HasKeys() && boundConfiguration.Headers.AllKeys.Contains("Authorization")) { var header = boundConfiguration.Headers["Authorization"]; request.Headers["Authorization"] = header; diff --git a/src/Elastic.Transport/Components/TransportClient/RequestInvokerHelpers.cs b/src/Elastic.Transport/Components/TransportClient/RequestInvokerHelpers.cs new file mode 100644 index 0000000..a1d0ec8 --- /dev/null +++ b/src/Elastic.Transport/Components/TransportClient/RequestInvokerHelpers.cs @@ -0,0 +1,24 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Diagnostics; +using Elastic.Transport.Diagnostics; + +namespace Elastic.Transport; + +internal static class RequestInvokerHelpers +{ + public static void SetOtelAttributes(BoundConfiguration boundConfiguration, TResponse response) where TResponse : TransportResponse + { + if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false))) + return; + + var attributes = boundConfiguration.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails); + + if (attributes is null) return; + + foreach (var attribute in attributes) + Activity.Current?.SetTag(attribute.Key, attribute.Value); + } +} diff --git a/src/Elastic.Transport/DistributedTransport.cs b/src/Elastic.Transport/DistributedTransport.cs index 8364fd4..5a2154d 100644 --- a/src/Elastic.Transport/DistributedTransport.cs +++ b/src/Elastic.Transport/DistributedTransport.cs @@ -169,7 +169,7 @@ private async ValueTask RequestCoreAsync( } else { - foreach (var node in pipeline.NextNode(startedOn, auditor)) + foreach (var node in pipeline.NextNode(startedOn, attemptedNodes, auditor)) { attemptedNodes++; endpoint = endpoint with { Node = node }; @@ -265,7 +265,7 @@ private async ValueTask RequestCoreAsync( if (activity is { IsAllDataRequested: true }) OpenTelemetry.SetCommonAttributes(activity, Configuration); - return FinalizeResponse(endpoint, boundConfiguration, data, pipeline, startedOn, auditor, seenExceptions, response); + return FinalizeResponse(endpoint, boundConfiguration, data, pipeline, startedOn, attemptedNodes, auditor, seenExceptions, response); } finally { @@ -303,6 +303,7 @@ private TResponse FinalizeResponse( PostData? postData, RequestPipeline pipeline, DateTimeOffset startedOn, + int attemptedNodes, Auditor auditor, List? seenExceptions, TResponse? response @@ -312,7 +313,7 @@ private TResponse FinalizeResponse( pipeline.ThrowNoNodesAttempted(endpoint, auditor, seenExceptions); var callDetails = GetMostRecentCallDetails(response, seenExceptions); - var clientException = pipeline.CreateClientException(response, callDetails, endpoint, auditor, startedOn, seenExceptions); + var clientException = pipeline.CreateClientException(response, callDetails, endpoint, auditor, startedOn, attemptedNodes, seenExceptions); if (response?.ApiCallDetails == null) pipeline.BadResponse(ref response, callDetails, endpoint, boundConfiguration, postData, clientException, auditor); diff --git a/tests/Elastic.Transport.Tests/Components/NodePool/StaticNodePoolTests.cs b/tests/Elastic.Transport.Tests/Components/NodePool/StaticNodePoolTests.cs new file mode 100644 index 0000000..4b18c4f --- /dev/null +++ b/tests/Elastic.Transport.Tests/Components/NodePool/StaticNodePoolTests.cs @@ -0,0 +1,60 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using Xunit; +using System; +using FluentAssertions; +using System.Linq; + +namespace Elastic.Transport.Tests.Components.NodePool +{ + public class StaticNodePoolTests + { + [Fact] + public void MultipleRequests_WhenOnlyASingleEndpointIsConfigured_AndTheEndpointIsUnavailable_DoNotThrowAnException() + { + Node[] nodes = [new Uri("http://localhost:9200")]; + var pool = new StaticNodePool(nodes); + var transport = new DistributedTransport(new TransportConfiguration(pool)); + + var response = transport.Request(HttpMethod.GET, "/", null, null); + + response.ApiCallDetails.SuccessOrKnownError.Should().BeFalse(); + response.ApiCallDetails.AuditTrail.Count.Should().Be(1); + + var audit = response.ApiCallDetails.AuditTrail.First(); + audit.Event.Should().Be(Diagnostics.Auditing.AuditEvent.BadRequest); + audit.Node.FailedAttempts.Should().Be(1); + audit.Node.IsAlive.Should().BeFalse(); + + response = transport.Request(HttpMethod.GET, "/", null, null); + + response.ApiCallDetails.SuccessOrKnownError.Should().BeFalse(); + + var eventCount = 0; + + foreach (var a in response.ApiCallDetails.AuditTrail) + { + eventCount++; + + if (eventCount == 1) + { + a.Event.Should().Be(Diagnostics.Auditing.AuditEvent.AllNodesDead); + } + + if (eventCount == 2) + { + a.Event.Should().Be(Diagnostics.Auditing.AuditEvent.Resurrection); + } + + if (eventCount == 3) + { + a.Event.Should().Be(Diagnostics.Auditing.AuditEvent.BadRequest); + audit.Node.FailedAttempts.Should().Be(2); + audit.Node.IsAlive.Should().BeFalse(); + } + } + } + } +} diff --git a/tests/Elastic.Transport.Tests/Components/TransportClient/RequestInvokerTests.cs b/tests/Elastic.Transport.Tests/Components/TransportClient/RequestInvokerTests.cs new file mode 100644 index 0000000..2935189 --- /dev/null +++ b/tests/Elastic.Transport.Tests/Components/TransportClient/RequestInvokerTests.cs @@ -0,0 +1,83 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using Xunit; +using System; +using FluentAssertions; +using System.Threading.Tasks; +using System.Threading; +using System.Diagnostics; +using System.Collections.Generic; +using System.IO; +using Elastic.Transport.Diagnostics; +using System.Net.NetworkInformation; + +namespace Elastic.Transport.Tests.Components.TransportClient +{ + public class RequestInvokerTests + { + [Fact] + public void NoExceptionShouldBeThrown_WhenHttpResponseDoesNotIncludeCloudHeaders() + { + // This test validates that if `ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails` returns null, + // no exception is thrown and attributes are not set. + + using var listener = new ActivityListener + { + ActivityStarted = _ => { }, + ActivityStopped = activity => { }, + ShouldListenTo = activitySource => activitySource.Name == OpenTelemetry.ElasticTransportActivitySourceName, + Sample = (ref ActivityCreationOptions _) => ActivitySamplingResult.AllData + }; + ActivitySource.AddActivityListener(listener); + + var requestInvoker = new HttpRequestInvoker(new TestResponseFactory()); + var pool = new SingleNodePool(new Uri("http://localhost:9200")); + var config = new TransportConfiguration(pool, requestInvoker); + var transport = new DistributedTransport(config); + + var response = transport.Head("/"); + response.ApiCallDetails.HttpStatusCode.Should().Be(200); + } + + private sealed class TestResponseFactory : ResponseFactory + { + public override TResponse Create( + Endpoint endpoint, + BoundConfiguration boundConfiguration, + PostData postData, + Exception ex, + int? statusCode, + Dictionary> headers, + Stream responseStream, + string contentType, + long contentLength, + IReadOnlyDictionary threadPoolStats, + IReadOnlyDictionary tcpStats) => CreateResponse(); + + public override Task CreateAsync( + Endpoint endpoint, + BoundConfiguration boundConfiguration, + PostData postData, + Exception ex, + int? statusCode, + Dictionary> headers, + Stream responseStream, + string contentType, + long contentLength, + IReadOnlyDictionary threadPoolStats, + IReadOnlyDictionary tcpStats, + CancellationToken cancellationToken = default) + { + var response = CreateResponse(); + return Task.FromResult(response); + } + + private static TResponse CreateResponse() where TResponse : TransportResponse, new() => new TResponse + { + ApiCallDetails = new() { HttpStatusCode = 200, Uri = new Uri("http://localhost/") } + }; + } + } +}