Skip to content

Commit

Permalink
Long Send Resiliency (#89)
Browse files Browse the repository at this point in the history
* LongSendTest

* Moved to separate project

* Server transport throws early

* raised timeouts

* Use poly for timeout

* Cleaned up some of the AsyncPolicy construction

* GraphQL Client back to 30 second timeout
  • Loading branch information
JR-Morgan authored Aug 21, 2024
1 parent 9bea79b commit 9916048
Show file tree
Hide file tree
Showing 11 changed files with 363 additions and 33 deletions.
7 changes: 7 additions & 0 deletions Speckle.Sdk.sln
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "workflows", "workflows", "{
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Speckle.Sdk.Logging", "src\Speckle.Sdk.Logging\Speckle.Sdk.Logging.csproj", "{156313B4-B588-4363-A0ED-5AB3A55AA4E8}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Speckle.LongSendProblem", "tests\Speckle.LongSendProblem\Speckle.LongSendProblem.csproj", "{6B8D2DD8-AD20-4021-975C-6FDE71DC425B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -82,6 +84,10 @@ Global
{156313B4-B588-4363-A0ED-5AB3A55AA4E8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{156313B4-B588-4363-A0ED-5AB3A55AA4E8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{156313B4-B588-4363-A0ED-5AB3A55AA4E8}.Release|Any CPU.Build.0 = Release|Any CPU
{6B8D2DD8-AD20-4021-975C-6FDE71DC425B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6B8D2DD8-AD20-4021-975C-6FDE71DC425B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6B8D2DD8-AD20-4021-975C-6FDE71DC425B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6B8D2DD8-AD20-4021-975C-6FDE71DC425B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{A413E196-3696-4F48-B635-04B5F76BF9C9} = {5CB96C27-FC5B-4A41-86B6-951AF99B8116}
Expand All @@ -93,5 +99,6 @@ Global
{4FB41A6D-D139-4111-8115-E3F9F6BEAF24} = {35047EE7-AD1D-4741-80A7-8F0E874718E9}
{B623BD21-5CAA-43F9-A539-1835276C220E} = {DA2AED52-58F9-471E-8AD8-102FD36129E3}
{156313B4-B588-4363-A0ED-5AB3A55AA4E8} = {5CB96C27-FC5B-4A41-86B6-951AF99B8116}
{6B8D2DD8-AD20-4021-975C-6FDE71DC425B} = {35047EE7-AD1D-4741-80A7-8F0E874718E9}
EndGlobalSection
EndGlobal
4 changes: 3 additions & 1 deletion src/Speckle.Sdk/Api/GraphQL/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@ private static GraphQLHttpClient CreateGraphQLClient(Account account, HttpClient

private static HttpClient CreateHttpClient(Account account)
{
var httpClient = Http.GetHttpProxyClient(timeout: TimeSpan.FromSeconds(30));
var httpClient = Http.GetHttpProxyClient(
new SpeckleHttpClientHandler(new HttpClientHandler(), Http.HttpAsyncPolicy(timeoutSeconds: 30))
);
Http.AddAuthHeader(httpClient, account.token);

httpClient.DefaultRequestHeaders.Add("apollographql-client-name", Setup.ApplicationVersion);
Expand Down
12 changes: 6 additions & 6 deletions src/Speckle.Sdk/Credentials/AccountManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static class AccountManager
/// <returns></returns>
public static async Task<ServerInfo> GetServerInfo(Uri server, CancellationToken cancellationToken = default)
{
using var httpClient = Http.GetHttpProxyClient(null, null);
using var httpClient = Http.GetHttpProxyClient();

using var gqlClient = new GraphQLHttpClient(
new GraphQLHttpClientOptions
Expand Down Expand Up @@ -102,7 +102,7 @@ public static async Task<UserInfo> GetUserInfo(
CancellationToken cancellationToken = default
)
{
using var httpClient = Http.GetHttpProxyClient(null, null);
using var httpClient = Http.GetHttpProxyClient();
Http.AddAuthHeader(httpClient, token);

using var gqlClient = new GraphQLHttpClient(
Expand Down Expand Up @@ -149,7 +149,7 @@ internal static async Task<ActiveUserServerInfoResponse> GetUserServerInfo(
{
try
{
using var httpClient = Http.GetHttpProxyClient(null, null);
using var httpClient = Http.GetHttpProxyClient();
Http.AddAuthHeader(httpClient, token);

using var client = new GraphQLHttpClient(
Expand Down Expand Up @@ -751,7 +751,7 @@ private static async Task<TokenExchangeResponse> GetToken(string accessCode, str
{
try
{
using var client = Http.GetHttpProxyClient(null, null);
using var client = Http.GetHttpProxyClient();

var body = new
{
Expand Down Expand Up @@ -779,7 +779,7 @@ private static async Task<TokenExchangeResponse> GetRefreshedToken(string refres
{
try
{
using var client = Http.GetHttpProxyClient(null, null);
using var client = Http.GetHttpProxyClient();

var body = new
{
Expand Down Expand Up @@ -811,7 +811,7 @@ private static async Task<TokenExchangeResponse> GetRefreshedToken(string refres
/// <exception cref="HttpRequestException">Request to <paramref name="server"/> failed to send or response was not successful</exception>
private static async Task<bool> IsFrontend2Server(Uri server)
{
using var httpClient = Http.GetHttpProxyClient(null, null);
using var httpClient = Http.GetHttpProxyClient();

var response = await Http.HttpPing(server).ConfigureAwait(false);

Expand Down
32 changes: 21 additions & 11 deletions src/Speckle.Sdk/Helpers/Http.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using Polly;
using Polly.Contrib.WaitAndRetry;
using Polly.Extensions.Http;
using Polly.Retry;
using Polly.Timeout;
using Speckle.Sdk.Common;
using Speckle.Sdk.Credentials;
using Speckle.Sdk.Logging;
Expand All @@ -13,15 +13,21 @@ namespace Speckle.Sdk.Helpers;

public static class Http
{
public const int DEFAULT_TIMEOUT_SECONDS = 60;

public static IEnumerable<TimeSpan> DefaultDelay()
{
return Backoff.DecorrelatedJitterBackoffV2(TimeSpan.FromMilliseconds(100), 5);
return Backoff.DecorrelatedJitterBackoffV2(TimeSpan.FromMilliseconds(200), 5);
}

public static AsyncRetryPolicy<HttpResponseMessage> HttpAsyncPolicy(IEnumerable<TimeSpan>? delay = null)
public static IAsyncPolicy<HttpResponseMessage> HttpAsyncPolicy(
IEnumerable<TimeSpan>? delay = null,
int timeoutSeconds = DEFAULT_TIMEOUT_SECONDS
)
{
return HttpPolicyExtensions
var retryPolicy = HttpPolicyExtensions
.HandleTransientHttpError()
.Or<TimeoutRejectedException>()
.WaitAndRetryAsync(
delay ?? DefaultDelay(),
(ex, timeSpan, retryAttempt, context) =>
Expand All @@ -30,6 +36,10 @@ public static AsyncRetryPolicy<HttpResponseMessage> HttpAsyncPolicy(IEnumerable<
context.Add("retryCount", retryAttempt);
}
);

var timeoutPolicy = Policy.TimeoutAsync<HttpResponseMessage>(timeoutSeconds);

return Policy.WrapAsync(retryPolicy, timeoutPolicy);
}

/// <summary>
Expand Down Expand Up @@ -128,7 +138,7 @@ public static async Task<HttpResponseMessage> HttpPing(Uri uri)
{
try
{
using var httpClient = GetHttpProxyClient(null, null);
using var httpClient = GetHttpProxyClient();
HttpResponseMessage response = await httpClient.GetAsync(uri).ConfigureAwait(false);
response.EnsureSuccessStatusCode();
SpeckleLog.Logger.Information("Successfully pinged {uri}", uri);
Expand All @@ -141,17 +151,17 @@ public static async Task<HttpResponseMessage> HttpPing(Uri uri)
}
}

public static HttpClient GetHttpProxyClient(
SpeckleHttpClientHandler? speckleHttpClientHandler = null,
TimeSpan? timeout = null
)
public static HttpClient GetHttpProxyClient(SpeckleHttpClientHandler? speckleHttpClientHandler = null)
{
IWebProxy proxy = WebRequest.GetSystemWebProxy();
proxy.Credentials = CredentialCache.DefaultCredentials;

speckleHttpClientHandler ??= new SpeckleHttpClientHandler(new HttpClientHandler());
speckleHttpClientHandler ??= new SpeckleHttpClientHandler(new HttpClientHandler(), HttpAsyncPolicy());

var client = new HttpClient(speckleHttpClientHandler) { Timeout = timeout ?? TimeSpan.FromSeconds(100) };
var client = new HttpClient(speckleHttpClientHandler)
{
Timeout = Timeout.InfiniteTimeSpan //timeout is configured on the SpeckleHttpClientHandler through policy
};
return client;
}

Expand Down
35 changes: 29 additions & 6 deletions src/Speckle.Sdk/Helpers/SpeckleHttpClientHandler.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
using Polly;
using System.Diagnostics;
using Polly;
using Speckle.Sdk.Common;
using Speckle.Sdk.Logging;

namespace Speckle.Sdk.Helpers;

public sealed class SpeckleHttpClientHandler : DelegatingHandler
{
private readonly IEnumerable<TimeSpan> _delay;
private readonly IAsyncPolicy<HttpResponseMessage> _resiliencePolicy;

public SpeckleHttpClientHandler(HttpMessageHandler innerhandler, IEnumerable<TimeSpan>? delay = null)
: base(innerhandler)
public SpeckleHttpClientHandler(HttpMessageHandler innerHandler, IAsyncPolicy<HttpResponseMessage> resiliencePolicy)
: base(innerHandler)
{
_delay = delay ?? Http.DefaultDelay();
_resiliencePolicy = resiliencePolicy;
}

/// <exception cref="OperationCanceledException"><paramref name="cancellationToken"/> requested cancel</exception>
Expand All @@ -23,15 +24,24 @@ CancellationToken cancellationToken
{
// this is a preliminary client server correlation implementation
// refactor this, when we have a better observability stack
var sw = Stopwatch.StartNew();
var context = new Context();
using var activity = SpeckleActivityFactory.Start("Http Send");
{
SpeckleLog.Logger.Debug(
"Starting execution of http request to {targetUrl} {correlationId} {traceId}",
request.RequestUri,
context.CorrelationId,
activity?.TraceId
);
activity?.SetTag("http.url", request.RequestUri);
activity?.SetTag("correlationId", context.CorrelationId);

context.Add("retryCount", 0);

request.Headers.Add("x-request-id", context.CorrelationId.ToString());

var policyResult = await Http.HttpAsyncPolicy(_delay)
var policyResult = await _resiliencePolicy
.ExecuteAndCaptureAsync(
ctx =>
{
Expand All @@ -42,6 +52,19 @@ CancellationToken cancellationToken
.ConfigureAwait(false);
context.TryGetValue("retryCount", out var retryCount);
activity?.SetTag("retryCount", retryCount);

SpeckleLog.Logger.Information(
"Execution of http request to {httpScheme}://{hostUrl}{relativeUrl} {resultStatus} with {httpStatusCode} after {elapsed} seconds and {retryCount} retries. Request correlation ID: {correlationId}",
request.RequestUri.Scheme,
request.RequestUri.Host,
request.RequestUri.PathAndQuery,
policyResult.Outcome == OutcomeType.Successful ? "succeeded" : "failed",
policyResult.Result?.StatusCode,
sw.Elapsed.TotalSeconds,
retryCount ?? 0,
context.CorrelationId.ToString()
);

if (policyResult.Outcome == OutcomeType.Successful)
{
return policyResult.Result.NotNull();
Expand Down
4 changes: 2 additions & 2 deletions src/Speckle.Sdk/Logging/Analytics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ internal static void AddConnectorToProfile(string hashedEmail, string connector)
string json = JsonConvert.SerializeObject(data);
var query = new StreamContent(new MemoryStream(Encoding.UTF8.GetBytes("data=" + HttpUtility.UrlEncode(json))));
using HttpClient client = Http.GetHttpProxyClient(null, null);
using HttpClient client = Http.GetHttpProxyClient();
client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/plain"));
query.Headers.ContentType = new MediaTypeHeaderValue("application/json");
var res = await client.PostAsync(MIXPANEL_SERVER + "/engage#profile-union", query).ConfigureAwait(false);
Expand Down Expand Up @@ -306,7 +306,7 @@ internal static void IdentifyProfile(string hashedEmail, string connector)
string json = JsonConvert.SerializeObject(data);
var query = new StreamContent(new MemoryStream(Encoding.UTF8.GetBytes("data=" + HttpUtility.UrlEncode(json))));
using HttpClient client = Http.GetHttpProxyClient(null, null);
using HttpClient client = Http.GetHttpProxyClient();
client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/plain"));
query.Headers.ContentType = new MediaTypeHeaderValue("application/json");
var res = await client.PostAsync(MIXPANEL_SERVER + "/engage#profile-set", query).ConfigureAwait(false);
Expand Down
10 changes: 5 additions & 5 deletions src/Speckle.Sdk/Transports/ServerUtils/ServerAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,23 @@ public sealed class ServerApi : IDisposable, IServerApi

private readonly HttpClient _client;

public ServerApi(Uri baseUri, string? authorizationToken, string blobStorageFolder, int timeoutSeconds = 60)
public ServerApi(Uri baseUri, string? authorizationToken, string blobStorageFolder, int timeoutSeconds = 120)
{
CancellationToken = CancellationToken.None;

BlobStorageFolder = blobStorageFolder;

_client = Http.GetHttpProxyClient(
new SpeckleHttpClientHandler(new HttpClientHandler { AutomaticDecompression = DecompressionMethods.GZip })
new SpeckleHttpClientHandler(
new HttpClientHandler { AutomaticDecompression = DecompressionMethods.GZip },
Http.HttpAsyncPolicy(timeoutSeconds: timeoutSeconds)
)
);

_client.BaseAddress = baseUri;
_client.Timeout = TimeSpan.FromSeconds(timeoutSeconds);

Http.AddAuthHeader(_client, authorizationToken);
}

private int RetriedCount { get; set; }
public CancellationToken CancellationToken { get; set; }
public bool CompressPayloads { get; set; } = true;

Expand Down
4 changes: 2 additions & 2 deletions src/Speckle.Sdk/Transports/ServerV2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void SaveBlob(Blob obj)
{
if (IsInErrorState)
{
return;
throw new TransportException("Server transport is in an errored state", _exception);
}

_sendBuffer.Add(($"blob:{hash}", obj.filePath));
Expand Down Expand Up @@ -206,7 +206,7 @@ public void SaveObject(string id, string serializedObject)
{
if (IsInErrorState)
{
return;
throw new TransportException($"{TransportName} transport failed", _exception);
}

_sendBuffer.Add((id, serializedObject));
Expand Down
70 changes: 70 additions & 0 deletions tests/Speckle.LongSendProblem/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using Speckle.Objects.Geometry;
using Speckle.Sdk;
using Speckle.Sdk.Api;
using Speckle.Sdk.Credentials;
using Speckle.Sdk.Host;
using Speckle.Sdk.Logging;
using Speckle.Sdk.Models;
using Speckle.Sdk.Transports;

//Replace this with a brand new model URL
Uri modelUrl = new("https://testing1.speckle.dev/projects/cdedc63e6d/models/2d68380f1d");

//Keep this as-is, copy the Data.db into %appdata%/longsendtest/Data.db
const string OBJECT_ID = "5cbf84a0061172102ef8a66ae914f232";

SetupSpeckle();
var testData = await GetSampleData(OBJECT_ID);
await SendToSpeckle(testData, modelUrl);

return;

static async Task SendToSpeckle(Base testData, Uri modelUrl)
{
SpeckleLog.Logger.Information("Starting Long Send Test Send");
var destinationTransport = await GetDestination(modelUrl);

var (res, _) = await Operations.Send(testData, new[] { destinationTransport });
SpeckleLog.Logger.Information("Starting Send was successful: {objectId}", res);
}

static async Task<ITransport> GetDestination(Uri modelUrl)
{
StreamWrapper sw = new(modelUrl.ToString());
var acc = await sw.GetAccount();
return new ServerTransport(acc, sw.StreamId);
}

static async Task<Base> GetSampleData(string objectId)
{
SpeckleLog.Logger.Information("Gathering Sample Data Set");
using SQLiteTransport source = new(SpecklePathProvider.UserApplicationDataPath(), "longsendtest");
MemoryTransport memoryTransport = new();
return await Operations.Receive(objectId, source, memoryTransport);
}

static void SetupSpeckle()
{
TypeLoader.Initialize(typeof(Base).Assembly, typeof(Point).Assembly);
var config = new SpeckleConfiguration(
new("Long Send Problem Test Script", "LongSend"),
default,
new(
MinimumLevel: SpeckleLogLevel.Debug,
Console: true,
File: new(Path: "SpeckleCoreLog.txt"),
Otel: new(
Endpoint: "https://seq.speckle.systems/ingest/otlp/v1/logs",
Headers: new() { { "X-Seq-ApiKey", "agZqxG4jQELxQQXh0iZQ" } }
)
),
new(
Console: false,
Otel: new(
Endpoint: "https://seq.speckle.systems/ingest/otlp/v1/traces",
Headers: new() { { "X-Seq-ApiKey", "agZqxG4jQELxQQXh0iZQ" } }
)
)
);
Setup.Initialize(config);
}
14 changes: 14 additions & 0 deletions tests/Speckle.LongSendProblem/Speckle.LongSendProblem.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Speckle.Objects\Speckle.Objects.csproj" />
</ItemGroup>

</Project>
Loading

0 comments on commit 9916048

Please sign in to comment.