Skip to content

Commit

Permalink
retry policy for server api (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
JR-Morgan authored Jul 26, 2024
1 parent 02f75c7 commit 1f92335
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 90 deletions.
9 changes: 0 additions & 9 deletions src/Speckle.Core/Api/GraphQL/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,6 @@ public void Dispose()

internal async Task<T> ExecuteWithResiliencePolicies<T>(Func<Task<T>> func)
{
// TODO: handle these in the HttpClient factory with a custom RequestHandler class
// 408 Request Timeout
// 425 Too Early
// 429 Too Many Requests
// 500 Internal Server Error
// 502 Bad Gateway
// 503 Service Unavailable
// 504 Gateway Timeout

var delay = Backoff.DecorrelatedJitterBackoffV2(TimeSpan.FromSeconds(1), 5);
var graphqlRetry = Policy
.Handle<SpeckleGraphQLInternalErrorException>()
Expand Down
27 changes: 12 additions & 15 deletions src/Speckle.Core/Helpers/Http.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,10 @@ public static AsyncRetryPolicy<HttpResponseMessage> HttpAsyncPolicy(IEnumerable<
.HandleTransientHttpError()
.WaitAndRetryAsync(
delay ?? DefaultDelay(),
(ex, timeSpan, retryAttempt, context) => {
//context.Remove("retryCount");
//context.Add("retryCount", retryAttempt);
//Log.Information(
// ex.Exception,
// "The http request failed with {exceptionType} exception retrying after {cooldown} milliseconds. This is retry attempt {retryAttempt}",
// ex.GetType().Name,
// timeSpan.TotalSeconds * 1000,
// retryAttempt
//);
(ex, timeSpan, retryAttempt, context) =>
{
context.Remove("retryCount");
context.Add("retryCount", retryAttempt);
}
);
}
Expand Down Expand Up @@ -207,14 +201,16 @@ CancellationToken cancellationToken
using (LogContext.PushProperty("httpMethod", request.Method))
{
SpeckleLog.Logger.Debug("Starting execution of http request to {targetUrl}", request.RequestUri);
var timer = new Stopwatch();
timer.Start();
var timer = Stopwatch.StartNew();

context.Add("retryCount", 0);

request.Headers.Add("x-request-id", context.CorrelationId.ToString());

var policyResult = await Http.HttpAsyncPolicy(_delay)
.ExecuteAndCaptureAsync(
ctx =>
{
request.Headers.Add("x-request-id", ctx.CorrelationId.ToString());
return base.SendAsync(request, cancellationToken);
},
context
Expand All @@ -226,14 +222,15 @@ CancellationToken cancellationToken
SpeckleLog
.Logger.ForContext("ExceptionType", policyResult.FinalException?.GetType())
.Information(
"Execution of http request to {httpScheme}://{hostUrl}/{relativeUrl} {resultStatus} with {httpStatusCode} after {elapsed} seconds and {retryCount} retries",
"Execution of http request to {httpScheme}://{hostUrl}{relativeUrl} {resultStatus} with {httpStatusCode} after {elapsed} seconds and {retryCount} retries. Request correlation ID: {correlationId}",
request.RequestUri.Scheme,
request.RequestUri.Host,
request.RequestUri.PathAndQuery,
status,
policyResult.Result?.StatusCode,
timer.Elapsed.TotalSeconds,
retryCount ?? 0
retryCount ?? 0,
context.CorrelationId.ToString()
);
if (policyResult.Outcome == OutcomeType.Successful)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Speckle.Core/Transports/IServerTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ public interface IServerTransport : IDisposable, ITransport, IBlobCapableTranspo
{
int TotalSentBytes { get; }
Credentials.Account Account { get; }
string BaseUri { get; }
Uri BaseUri { get; }
string StreamId { get; }
int TimeoutSeconds { get; set; }
}
4 changes: 2 additions & 2 deletions src/Speckle.Core/Transports/ServerUtils/ParallelServerAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ internal class ParallelServerApi : ParallelOperationExecutor<ServerApiOperation>
{
private readonly string _authToken;

private readonly string _baseUri;
private readonly Uri _baseUri;

private readonly object _callbackLock = new();

private readonly int _timeoutSeconds;

public ParallelServerApi(
string baseUri,
Uri baseUri,
string authorizationToken,
string blobStorageFolder,
int timeoutSeconds,
Expand Down
73 changes: 12 additions & 61 deletions src/Speckle.Core/Transports/ServerUtils/ServerAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@ public sealed class ServerApi : IDisposable, IServerApi

private const int MAX_REQUEST_SIZE = 100_000_000;

private const int RETRY_COUNT = 3;
private static readonly HashSet<int> s_retryCodes = new() { 408, 502, 503, 504 };
private static readonly char[] s_separator = { '\t' };
private static readonly string[] s_filenameSeparator = { "filename=" };

private readonly HttpClient _client;

public ServerApi(string baseUri, string? authorizationToken, string blobStorageFolder, int timeoutSeconds = 60)
public ServerApi(Uri baseUri, string? authorizationToken, string blobStorageFolder, int timeoutSeconds = 60)
{
CancellationToken = CancellationToken.None;

Expand All @@ -44,7 +42,7 @@ public ServerApi(string baseUri, string? authorizationToken, string blobStorageF
new SpeckleHttpClientHandler { AutomaticDecompression = DecompressionMethods.GZip }
);

_client.BaseAddress = new Uri(baseUri);
_client.BaseAddress = baseUri;
_client.Timeout = TimeSpan.FromSeconds(timeoutSeconds);

Http.AddAuthHeader(_client, authorizationToken);
Expand Down Expand Up @@ -77,13 +75,9 @@ public async Task<string> DownloadSingleObject(string streamId, string objectId)
Method = HttpMethod.Get
};

HttpResponseMessage rootHttpResponse;
do
{
rootHttpResponse = await _client
.SendAsync(rootHttpMessage, HttpCompletionOption.ResponseContentRead, CancellationToken)
.ConfigureAwait(false);
} while (ShouldRetry(rootHttpResponse));
HttpResponseMessage rootHttpResponse = await _client
.SendAsync(rootHttpMessage, HttpCompletionOption.ResponseContentRead, CancellationToken)
.ConfigureAwait(false);

rootHttpResponse.EnsureSuccessStatusCode();

Expand Down Expand Up @@ -256,11 +250,7 @@ public async Task UploadBlobs(string streamId, IReadOnlyList<(string, string)> o

try
{
HttpResponseMessage response;
do
{
response = await _client.SendAsync(message, CancellationToken).ConfigureAwait(false);
} while (ShouldRetry(response)); //TODO: can we get rid of this now we have polly?
HttpResponseMessage response = await _client.SendAsync(message, CancellationToken).ConfigureAwait(false);

response.EnsureSuccessStatusCode();

Expand Down Expand Up @@ -335,13 +325,9 @@ CbObjectDownloaded onObjectCallback
childrenHttpMessage.Content = new StringContent(serializedPayload, Encoding.UTF8, "application/json");
childrenHttpMessage.Headers.Add("Accept", "text/plain");

HttpResponseMessage childrenHttpResponse;
do
{
childrenHttpResponse = await _client
.SendAsync(childrenHttpMessage, HttpCompletionOption.ResponseHeadersRead, CancellationToken)
.ConfigureAwait(false);
} while (ShouldRetry(childrenHttpResponse));
HttpResponseMessage childrenHttpResponse = await _client
.SendAsync(childrenHttpMessage, HttpCompletionOption.ResponseHeadersRead, CancellationToken)
.ConfigureAwait(false);

childrenHttpResponse.EnsureSuccessStatusCode();

Expand Down Expand Up @@ -370,12 +356,8 @@ private async Task<Dictionary<string, bool>> HasObjectsImpl(string streamId, IRe
string serializedPayload = JsonConvert.SerializeObject(payload);
var uri = new Uri($"/api/diff/{streamId}", UriKind.Relative);

HttpResponseMessage response;
using StringContent stringContent = new(serializedPayload, Encoding.UTF8, "application/json");
do
{
response = await _client.PostAsync(uri, stringContent, CancellationToken).ConfigureAwait(false);
} while (ShouldRetry(response));
HttpResponseMessage response = await _client.PostAsync(uri, stringContent, CancellationToken).ConfigureAwait(false);

response.EnsureSuccessStatusCode();

Expand Down Expand Up @@ -434,11 +416,7 @@ private async Task UploadObjectsImpl(string streamId, List<List<(string, string)
}
}
message.Content = multipart;
HttpResponseMessage response;
do
{
response = await _client.SendAsync(message, CancellationToken).ConfigureAwait(false);
} while (ShouldRetry(response));
HttpResponseMessage response = await _client.SendAsync(message, CancellationToken).ConfigureAwait(false);

response.EnsureSuccessStatusCode();

Expand All @@ -454,12 +432,7 @@ public async Task<List<string>> HasBlobs(string streamId, IReadOnlyList<string>

using StringContent stringContent = new(payload, Encoding.UTF8, "application/json");

//TODO: can we get rid of this now we have polly?
HttpResponseMessage response;
do
{
response = await _client.PostAsync(uri, stringContent, CancellationToken).ConfigureAwait(false);
} while (ShouldRetry(response));
HttpResponseMessage response = await _client.PostAsync(uri, stringContent, CancellationToken).ConfigureAwait(false);

response.EnsureSuccessStatusCode();

Expand All @@ -473,28 +446,6 @@ public async Task<List<string>> HasBlobs(string streamId, IReadOnlyList<string>
return parsed;
}

//TODO: can we get rid of this now we have polly?
private bool ShouldRetry(HttpResponseMessage? serverResponse)
{
if (serverResponse == null)
{
return true;
}

if (!s_retryCodes.Contains((int)serverResponse.StatusCode))
{
return false;
}

if (RetriedCount >= RETRY_COUNT)
{
return false;
}

RetriedCount += 1;
return true;
}

private sealed class BlobUploadResult
{
public List<BlobUploadResultItem> uploadResults { get; set; }
Expand Down
4 changes: 2 additions & 2 deletions src/Speckle.Core/Transports/ServerV2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public ServerTransport(Account account, string streamId, int timeoutSeconds = 60
}

Account = account;
BaseUri = account.serverInfo.url;
BaseUri = new(account.serverInfo.url);
StreamId = streamId;
AuthorizationToken = account.token;
TimeoutSeconds = timeoutSeconds;
Expand All @@ -49,7 +49,7 @@ public ServerTransport(Account account, string streamId, int timeoutSeconds = 60
public int TotalSentBytes { get; private set; }

public Account Account { get; }
public string BaseUri { get; }
public Uri BaseUri { get; }
public string StreamId { get; internal set; }

public int TimeoutSeconds { get; set; }
Expand Down

0 comments on commit 1f92335

Please sign in to comment.