From dc4da4907851ef581083926da83f24be7b37f1eb Mon Sep 17 00:00:00 2001 From: Adam Hathcock Date: Thu, 15 Aug 2024 09:43:41 +0100 Subject: [PATCH] Add Progress for transfers (#74) * progress intermediate commit * add progress for download * remove unused code * remove batch sent callbacks * multi-threaded deserialize works * Progress for download and deserialization * Fix tests * Have less indeterminate deserialization * fix deserialization * make download faster with buffered stream * put local receive back * remove unused callback * fmt --- src/Speckle.Sdk/Api/GraphQL/Client.cs | 2 +- src/Speckle.Sdk/Api/Helpers.cs | 117 ++--------------- .../Api/Operations/Operations.Receive.cs | 2 +- .../Api/Operations/Operations.Send.cs | 14 +- .../Api/Operations/Operations.Serialize.cs | 5 +- src/Speckle.Sdk/Api/Operations/Operations.cs | 15 +-- src/Speckle.Sdk/Credentials/AccountManager.cs | 12 +- src/Speckle.Sdk/Helpers/Http.cs | 65 ++-------- .../Helpers/SpeckleHttpClientHandler.cs | 59 +++++++++ src/Speckle.Sdk/Logging/Analytics.cs | 4 +- .../Serialisation/BaseObjectDeserializerV2.cs | 37 +++--- .../Serialisation/BaseObjectSerializerV2.cs | 36 ++++-- .../DeserializationWorkerThreads.cs | 13 +- src/Speckle.Sdk/Transports/DiskTransport.cs | 4 +- .../Transports/IServerTransport.cs | 1 - src/Speckle.Sdk/Transports/ITransport.cs | 19 ++- src/Speckle.Sdk/Transports/Memory.cs | 4 +- src/Speckle.Sdk/Transports/SQLite.cs | 4 +- .../ServerUtils/HttpHeaderExtensions.cs | 14 ++ .../Transports/ServerUtils/IServerApi.cs | 24 +++- .../ServerUtils/ParallelServerAPI.cs | 76 ++++++----- .../Transports/ServerUtils/ProgressContent.cs | 45 +++++++ .../Transports/ServerUtils/ProgressStream.cs | 38 ++++++ .../Transports/ServerUtils/ServerAPI.cs | 122 ++++++++++-------- src/Speckle.Sdk/Transports/ServerV2.cs | 46 ++----- .../Transports/TransportHelpers.cs | 5 +- .../TestTransport.cs | 2 +- .../Api/Operations/SendReceiveLocal.cs | 14 +- .../Transports/TransportTests.cs | 2 +- 29 files changed, 412 insertions(+), 389 deletions(-) create mode 100644 src/Speckle.Sdk/Helpers/SpeckleHttpClientHandler.cs create mode 100644 src/Speckle.Sdk/Transports/ServerUtils/HttpHeaderExtensions.cs create mode 100644 src/Speckle.Sdk/Transports/ServerUtils/ProgressContent.cs create mode 100644 src/Speckle.Sdk/Transports/ServerUtils/ProgressStream.cs diff --git a/src/Speckle.Sdk/Api/GraphQL/Client.cs b/src/Speckle.Sdk/Api/GraphQL/Client.cs index 9a6af8a5..0bb3627a 100644 --- a/src/Speckle.Sdk/Api/GraphQL/Client.cs +++ b/src/Speckle.Sdk/Api/GraphQL/Client.cs @@ -330,7 +330,7 @@ private static GraphQLHttpClient CreateGraphQLClient(Account account, HttpClient private static HttpClient CreateHttpClient(Account account) { - var httpClient = Http.GetHttpProxyClient(null, TimeSpan.FromSeconds(30)); + var httpClient = Http.GetHttpProxyClient(timeout: TimeSpan.FromSeconds(30)); Http.AddAuthHeader(httpClient, account.token); httpClient.DefaultRequestHeaders.Add("apollographql-client-name", Setup.ApplicationVersion); diff --git a/src/Speckle.Sdk/Api/Helpers.cs b/src/Speckle.Sdk/Api/Helpers.cs index 30e1df2f..a0d1b038 100644 --- a/src/Speckle.Sdk/Api/Helpers.cs +++ b/src/Speckle.Sdk/Api/Helpers.cs @@ -1,10 +1,10 @@ -#nullable disable using System.Collections.Concurrent; using System.Diagnostics.Contracts; using System.Reflection; using System.Runtime.InteropServices; using Speckle.Newtonsoft.Json; using Speckle.Sdk.Api.GraphQL.Models; +using Speckle.Sdk.Common; using Speckle.Sdk.Credentials; using Speckle.Sdk.Helpers; using Speckle.Sdk.Host; @@ -30,9 +30,9 @@ public static class Helpers public static async Task Receive( this IServerTransportFactory serverTransportFactory, string stream, - Account account = null, - Action> onProgressAction = null, - Action onTotalChildrenCountKnown = null + Account? account = null, + Action>? onProgressAction = null, + Action? onTotalChildrenCountKnown = null ) { var sw = new StreamWrapper(stream); @@ -61,17 +61,17 @@ public static async Task Receive( using var transport = serverTransportFactory.Create(client.Account, sw.StreamId); string objectId = ""; - Commit commit = null; + Commit? commit = null; //OBJECT URL if (!string.IsNullOrEmpty(sw.ObjectId)) { - objectId = sw.ObjectId; + objectId = sw.ObjectId.NotNull(); } //COMMIT URL else if (!string.IsNullOrEmpty(sw.CommitId)) { - commit = await client.CommitGet(sw.StreamId, sw.CommitId).ConfigureAwait(false); + commit = await client.CommitGet(sw.StreamId, sw.CommitId.NotNull()).ConfigureAwait(false); objectId = commit.referencedObject; } //BRANCH URL OR STREAM URL @@ -79,7 +79,7 @@ public static async Task Receive( { var branchName = string.IsNullOrEmpty(sw.BranchName) ? "main" : sw.BranchName; - var branch = await client.BranchGet(sw.StreamId, branchName, 1).ConfigureAwait(false); + var branch = await client.BranchGet(sw.StreamId, branchName.NotNull(), 1).ConfigureAwait(false); if (branch.commits.items.Count == 0) { throw new SpeckleException("The selected branch has no commits."); @@ -94,7 +94,7 @@ public static async Task Receive( Analytics.Events.Receive, new Dictionary { - { "sourceHostApp", HostApplications.GetHostAppFromString(commit.sourceApplication).Slug }, + { "sourceHostApp", HostApplications.GetHostAppFromString(commit.NotNull().sourceApplication).Slug }, { "sourceHostAppVersion", commit.sourceApplication } } ); @@ -129,105 +129,6 @@ await client return receiveRes; } - /// - /// Helper method to Send to a Speckle Server. - /// - /// Stream URL or Id to send to. If the URL contains branchName, commitId or objectId those will be used, otherwise the latest commit from main will be received. - /// Data to send - /// Account to use. If not provided the default account will be used. - /// Toggle for the default cache. If set to false, it will only send to the provided transports. - /// Action invoked on progress iterations. - /// - public static async Task Send( - this IServerTransportFactory serverTransportFactory, - string stream, - Base data, - string message = "No message", - string sourceApplication = ".net", - int totalChildrenCount = 0, - Account account = null, - bool useDefaultCache = true, - Action> onProgressAction = null - ) - { - var sw = new StreamWrapper(stream); - - using var client = new Client(account ?? await sw.GetAccount().ConfigureAwait(false)); - - using var transport = serverTransportFactory.Create(client.Account, sw.StreamId); - var branchName = string.IsNullOrEmpty(sw.BranchName) ? "main" : sw.BranchName; - - var (objectId, _) = await Operations.Send(data, transport, useDefaultCache, onProgressAction).ConfigureAwait(false); - - Analytics.TrackEvent(client.Account, Analytics.Events.Send); - - return await client - .CommitCreate( - new CommitCreateInput - { - streamId = sw.StreamId, - branchName = branchName, - objectId = objectId, - message = message, - sourceApplication = sourceApplication, - totalChildrenCount = totalChildrenCount - } - ) - .ConfigureAwait(false); - } - - /// - /// - /// - /// The connector slug eg. revit, rhino, etc - /// - public static async Task IsConnectorUpdateAvailable(string slug) - { - //when debugging the version is not correct, so don't bother - if (!Analytics.IsReleaseMode) - { - return false; - } - - try - { - using HttpClient client = Http.GetHttpProxyClient(); - var response = await client.GetStringAsync($"{FEEDS_ENDPOINT}/{slug}.json").ConfigureAwait(false); - var connector = JsonConvert.DeserializeObject(response); - - var os = Os.Win; //TODO: This won't work for linux - if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) - { - os = Os.OSX; - } - - var versions = connector.Versions.Where(x => x.Os == os).OrderByDescending(x => x.Date).ToList(); - var stables = versions.Where(x => !x.Prerelease).ToArray(); - if (stables.Length == 0) - { - return false; - } - - var latestVersion = new System.Version(stables.First().Number); - - var currentVersion = Assembly.GetAssembly(typeof(Helpers)).GetName().Version; - - if (latestVersion > currentVersion) - { - return true; - } - } - catch (Exception ex) when (!ex.IsFatal()) - { - //.Log.ForContext("slug", slug) - SpeckleLog.Logger.Warning(ex, "Failed to check for connector updates"); - } - - return false; - } - -#nullable enable - /// /// value to fallback to if the given is public static string TimeAgo(DateTime? timestamp, string fallback = "Never") diff --git a/src/Speckle.Sdk/Api/Operations/Operations.Receive.cs b/src/Speckle.Sdk/Api/Operations/Operations.Receive.cs index acf36744..6f9fd10b 100644 --- a/src/Speckle.Sdk/Api/Operations/Operations.Receive.cs +++ b/src/Speckle.Sdk/Api/Operations/Operations.Receive.cs @@ -34,7 +34,7 @@ public static async Task Receive( string objectId, ITransport? remoteTransport = null, ITransport? localTransport = null, - Action>? onProgressAction = null, + Action>? onProgressAction = null, Action? onTotalChildrenCountKnown = null, CancellationToken cancellationToken = default ) diff --git a/src/Speckle.Sdk/Api/Operations/Operations.Send.cs b/src/Speckle.Sdk/Api/Operations/Operations.Send.cs index 5dc876a3..8334a690 100644 --- a/src/Speckle.Sdk/Api/Operations/Operations.Send.cs +++ b/src/Speckle.Sdk/Api/Operations/Operations.Send.cs @@ -14,7 +14,7 @@ public static partial class Operations /// Sends a Speckle Object to the provided and (optionally) the default local cache /// /// - /// + /// /// When , an additional will be included /// The or was /// @@ -25,7 +25,7 @@ public static partial class Operations Base value, ITransport transport, bool useDefaultCache, - Action>? onProgressAction = null, + Action>? onProgressAction = null, CancellationToken cancellationToken = default ) { @@ -61,7 +61,7 @@ public static partial class Operations public static async Task<(string rootObjId, IReadOnlyDictionary convertedReferences)> Send( Base value, IReadOnlyCollection transports, - Action>? onProgressAction = null, + Action>? onProgressAction = null, CancellationToken cancellationToken = default ) { @@ -75,11 +75,8 @@ public static partial class Operations throw new ArgumentException("Expected at least on transport to be specified", nameof(transports)); } - var transportContext = transports.ToDictionary(t => t.TransportName, t => t.TransportContext); - // make sure all logs in the operation have the proper context using var activity = SpeckleActivityFactory.Start(); - activity?.SetTag("transportContext", transportContext); activity?.SetTag("correlationId", Guid.NewGuid().ToString()); { var sendTimer = Stopwatch.StartNew(); @@ -108,8 +105,7 @@ public static partial class Operations ); activity?.SetTag("serializerElapsed", serializerV2.Elapsed); SpeckleLog.Logger.Information( - "Finished sending {objectCount} objects after {elapsed}, result {objectId}", - transports.Max(t => t.SavedObjectCount), + "Finished sending objects after {elapsed}, result {objectId}", sendTimer.Elapsed.TotalSeconds, rootObjectId ); @@ -140,7 +136,7 @@ public static partial class Operations } } - /// + /// internal static async Task SerializerSend( Base value, BaseObjectSerializerV2 serializer, diff --git a/src/Speckle.Sdk/Api/Operations/Operations.Serialize.cs b/src/Speckle.Sdk/Api/Operations/Operations.Serialize.cs index b88adf48..41678963 100644 --- a/src/Speckle.Sdk/Api/Operations/Operations.Serialize.cs +++ b/src/Speckle.Sdk/Api/Operations/Operations.Serialize.cs @@ -2,6 +2,7 @@ using Speckle.Sdk.Logging; using Speckle.Sdk.Models; using Speckle.Sdk.Serialisation; +using Speckle.Sdk.Transports; namespace Speckle.Sdk.Api; @@ -13,7 +14,7 @@ public static partial class Operations /// /// If you want to save and persist an object to Speckle Transport or Server, /// please use any of the "Send" methods. - /// + /// /// /// The object to serialise /// @@ -27,7 +28,7 @@ public static string Serialize(Base value, CancellationToken cancellationToken = /// /// Note: if you want to pull an object from a Speckle Transport or Server, /// please use - /// + /// /// /// The json string representation of a speckle object that you want to deserialize /// diff --git a/src/Speckle.Sdk/Api/Operations/Operations.cs b/src/Speckle.Sdk/Api/Operations/Operations.cs index 88684658..2a961774 100644 --- a/src/Speckle.Sdk/Api/Operations/Operations.cs +++ b/src/Speckle.Sdk/Api/Operations/Operations.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using Speckle.Sdk.Transports; namespace Speckle.Sdk.Api; @@ -14,23 +15,17 @@ public static partial class Operations /// /// /// - private static Action? GetInternalProgressAction( - Action>? onProgressAction - ) + private static Action? GetInternalProgressAction(Action>? onProgressAction) { if (onProgressAction is null) { return null; } - var localProgressDict = new ConcurrentDictionary(); - - return (name, processed) => + return (args) => { - if (!localProgressDict.TryAdd(name, processed)) - { - localProgressDict[name] += processed; - } + var localProgressDict = new ConcurrentBag(); + localProgressDict.Add(args); onProgressAction.Invoke(localProgressDict); }; diff --git a/src/Speckle.Sdk/Credentials/AccountManager.cs b/src/Speckle.Sdk/Credentials/AccountManager.cs index e817d913..3e01c917 100644 --- a/src/Speckle.Sdk/Credentials/AccountManager.cs +++ b/src/Speckle.Sdk/Credentials/AccountManager.cs @@ -38,7 +38,7 @@ public static class AccountManager /// public static async Task GetServerInfo(Uri server, CancellationToken cancellationToken = default) { - using var httpClient = Http.GetHttpProxyClient(); + using var httpClient = Http.GetHttpProxyClient(null, null); using var gqlClient = new GraphQLHttpClient( new GraphQLHttpClientOptions @@ -102,7 +102,7 @@ public static async Task GetUserInfo( CancellationToken cancellationToken = default ) { - using var httpClient = Http.GetHttpProxyClient(); + using var httpClient = Http.GetHttpProxyClient(null, null); Http.AddAuthHeader(httpClient, token); using var gqlClient = new GraphQLHttpClient( @@ -149,7 +149,7 @@ internal static async Task GetUserServerInfo( { try { - using var httpClient = Http.GetHttpProxyClient(); + using var httpClient = Http.GetHttpProxyClient(null, null); Http.AddAuthHeader(httpClient, token); using var client = new GraphQLHttpClient( @@ -751,7 +751,7 @@ private static async Task GetToken(string accessCode, str { try { - using var client = Http.GetHttpProxyClient(); + using var client = Http.GetHttpProxyClient(null, null); var body = new { @@ -779,7 +779,7 @@ private static async Task GetRefreshedToken(string refres { try { - using var client = Http.GetHttpProxyClient(); + using var client = Http.GetHttpProxyClient(null, null); var body = new { @@ -811,7 +811,7 @@ private static async Task GetRefreshedToken(string refres /// Request to failed to send or response was not successful private static async Task IsFrontend2Server(Uri server) { - using var httpClient = Http.GetHttpProxyClient(); + using var httpClient = Http.GetHttpProxyClient(null, null); var response = await Http.HttpPing(server).ConfigureAwait(false); diff --git a/src/Speckle.Sdk/Helpers/Http.cs b/src/Speckle.Sdk/Helpers/Http.cs index 10fae8c1..63e345e7 100644 --- a/src/Speckle.Sdk/Helpers/Http.cs +++ b/src/Speckle.Sdk/Helpers/Http.cs @@ -128,7 +128,7 @@ public static async Task HttpPing(Uri uri) { try { - using var httpClient = GetHttpProxyClient(); + using var httpClient = GetHttpProxyClient(null, null); HttpResponseMessage response = await httpClient.GetAsync(uri).ConfigureAwait(false); response.EnsureSuccessStatusCode(); SpeckleLog.Logger.Information("Successfully pinged {uri}", uri); @@ -141,13 +141,17 @@ public static async Task HttpPing(Uri uri) } } - public static HttpClient GetHttpProxyClient(SpeckleHttpClientHandler? handler = null, TimeSpan? timeout = null) + public static HttpClient GetHttpProxyClient( + SpeckleHttpClientHandler? speckleHttpClientHandler = null, + TimeSpan? timeout = null + ) { IWebProxy proxy = WebRequest.GetSystemWebProxy(); proxy.Credentials = CredentialCache.DefaultCredentials; - handler ??= new SpeckleHttpClientHandler(); - var client = new HttpClient(handler) { Timeout = timeout ?? TimeSpan.FromSeconds(100) }; + speckleHttpClientHandler ??= new SpeckleHttpClientHandler(new HttpClientHandler()); + + var client = new HttpClient(speckleHttpClientHandler) { Timeout = timeout ?? TimeSpan.FromSeconds(100) }; return client; } @@ -171,56 +175,3 @@ public static void AddAuthHeader(HttpClient client, string? authToken) } } } - -public sealed class SpeckleHttpClientHandler : HttpClientHandler -{ - private readonly IEnumerable _delay; - - public SpeckleHttpClientHandler(IEnumerable? delay = null) - { - _delay = delay ?? Http.DefaultDelay(); - } - - /// requested cancel - /// Send request failed - protected override async Task SendAsync( - HttpRequestMessage request, - CancellationToken cancellationToken - ) - { - // this is a preliminary client server correlation implementation - // refactor this, when we have a better observability stack - var context = new Context(); - using var activity = SpeckleActivityFactory.Start("Http Send"); - { - activity?.SetTag("http.url", request.RequestUri); - context.Add("retryCount", 0); - - request.Headers.Add("x-request-id", context.CorrelationId.ToString()); - - var policyResult = await Http.HttpAsyncPolicy(_delay) - .ExecuteAndCaptureAsync( - ctx => - { - return base.SendAsync(request, cancellationToken); - }, - context - ) - .ConfigureAwait(false); - context.TryGetValue("retryCount", out var retryCount); - activity?.SetTag("retryCount", retryCount); - if (policyResult.Outcome == OutcomeType.Successful) - { - return policyResult.Result.NotNull(); - } - - // if the policy failed due to a cancellation, AND it was our cancellation token, then don't wrap the exception, and rethrow an new cancellation - if (policyResult.FinalException is OperationCanceledException) - { - cancellationToken.ThrowIfCancellationRequested(); - } - - throw new HttpRequestException("Policy Failed", policyResult.FinalException); - } - } -} diff --git a/src/Speckle.Sdk/Helpers/SpeckleHttpClientHandler.cs b/src/Speckle.Sdk/Helpers/SpeckleHttpClientHandler.cs new file mode 100644 index 00000000..cce3a46f --- /dev/null +++ b/src/Speckle.Sdk/Helpers/SpeckleHttpClientHandler.cs @@ -0,0 +1,59 @@ +using Polly; +using Speckle.Sdk.Common; +using Speckle.Sdk.Logging; + +namespace Speckle.Sdk.Helpers; + +public sealed class SpeckleHttpClientHandler : DelegatingHandler +{ + private readonly IEnumerable _delay; + + public SpeckleHttpClientHandler(HttpMessageHandler innerhandler, IEnumerable? delay = null) + : base(innerhandler) + { + _delay = delay ?? Http.DefaultDelay(); + } + + /// requested cancel + /// Send request failed + protected override async Task SendAsync( + HttpRequestMessage request, + CancellationToken cancellationToken + ) + { + // this is a preliminary client server correlation implementation + // refactor this, when we have a better observability stack + var context = new Context(); + using var activity = SpeckleActivityFactory.Start("Http Send"); + { + activity?.SetTag("http.url", request.RequestUri); + context.Add("retryCount", 0); + + request.Headers.Add("x-request-id", context.CorrelationId.ToString()); + + var policyResult = await Http.HttpAsyncPolicy(_delay) + .ExecuteAndCaptureAsync( + ctx => + { + return base.SendAsync(request, cancellationToken); + }, + context + ) + .ConfigureAwait(false); + context.TryGetValue("retryCount", out var retryCount); + activity?.SetTag("retryCount", retryCount); + if (policyResult.Outcome == OutcomeType.Successful) + { + return policyResult.Result.NotNull(); + } + + // if the policy failed due to a cancellation, AND it was our cancellation token, then don't wrap the exception, and rethrow an new cancellation + if (policyResult.FinalException is OperationCanceledException) + { + cancellationToken.ThrowIfCancellationRequested(); + } + + throw new HttpRequestException("Policy Failed", policyResult.FinalException); + } + } +} diff --git a/src/Speckle.Sdk/Logging/Analytics.cs b/src/Speckle.Sdk/Logging/Analytics.cs index 7e35564c..8bb99b1a 100644 --- a/src/Speckle.Sdk/Logging/Analytics.cs +++ b/src/Speckle.Sdk/Logging/Analytics.cs @@ -274,7 +274,7 @@ internal static void AddConnectorToProfile(string hashedEmail, string connector) string json = JsonConvert.SerializeObject(data); var query = new StreamContent(new MemoryStream(Encoding.UTF8.GetBytes("data=" + HttpUtility.UrlEncode(json)))); - using HttpClient client = Http.GetHttpProxyClient(); + using HttpClient client = Http.GetHttpProxyClient(null, null); client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/plain")); query.Headers.ContentType = new MediaTypeHeaderValue("application/json"); var res = await client.PostAsync(MIXPANEL_SERVER + "/engage#profile-union", query).ConfigureAwait(false); @@ -306,7 +306,7 @@ internal static void IdentifyProfile(string hashedEmail, string connector) string json = JsonConvert.SerializeObject(data); var query = new StreamContent(new MemoryStream(Encoding.UTF8.GetBytes("data=" + HttpUtility.UrlEncode(json)))); - using HttpClient client = Http.GetHttpProxyClient(); + using HttpClient client = Http.GetHttpProxyClient(null, null); client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/plain")); query.Headers.ContentType = new MediaTypeHeaderValue("application/json"); var res = await client.PostAsync(MIXPANEL_SERVER + "/engage#profile-set", query).ConfigureAwait(false); diff --git a/src/Speckle.Sdk/Serialisation/BaseObjectDeserializerV2.cs b/src/Speckle.Sdk/Serialisation/BaseObjectDeserializerV2.cs index 90d5c5e2..bd3defb5 100644 --- a/src/Speckle.Sdk/Serialisation/BaseObjectDeserializerV2.cs +++ b/src/Speckle.Sdk/Serialisation/BaseObjectDeserializerV2.cs @@ -33,7 +33,7 @@ public sealed class BaseObjectDeserializerV2 /// public ITransport ReadTransport { get; set; } - public Action? OnProgressAction { get; set; } + public Action? OnProgressAction { get; set; } public string? BlobStorageFolder { get; set; } public TimeSpan Elapsed { get; private set; } @@ -66,6 +66,7 @@ public Base Deserialize(string rootObjectJson) List<(string, int)> closures = GetClosures(rootObjectJson); closures.Sort((a, b) => b.Item2.CompareTo(a.Item2)); + int i = 0; foreach (var closure in closures) { string objId = closure.Item1; @@ -77,7 +78,7 @@ public Base Deserialize(string rootObjectJson) //but adding throw here breaks blobs tests, see CNX-8541 stopwatch.Start(); - object? deserializedOrPromise = DeserializeTransportObjectProxy(objJson); + object? deserializedOrPromise = DeserializeTransportObjectProxy(objJson, i++, closures.Count); lock (_deserializedObjects) { _deserializedObjects[objId] = deserializedOrPromise; @@ -87,7 +88,7 @@ public Base Deserialize(string rootObjectJson) object? ret; try { - ret = DeserializeTransportObject(rootObjectJson); + ret = DeserializeTransportObject(rootObjectJson, null, null); } catch (JsonReaderException ex) { @@ -140,29 +141,29 @@ public Base Deserialize(string rootObjectJson) } } - private object? DeserializeTransportObjectProxy(string? objectJson) + private object? DeserializeTransportObjectProxy(string? objectJson, long? current, long? total) { if (objectJson is null) { return null; } // Try background work - Task? bgResult = _workerThreads?.TryStartTask(WorkerThreadTaskType.Deserialize, objectJson); //BUG: Because we don't guarantee this task will ever be awaited, this may lead to unobserved exceptions! + Task? bgResult = _workerThreads?.TryStartTask( + WorkerThreadTaskType.Deserialize, + objectJson, + current, + total + ); //BUG: Because we don't guarantee this task will ever be awaited, this may lead to unobserved exceptions! if (bgResult != null) { return bgResult; } // SyncS - return DeserializeTransportObject(objectJson); + return DeserializeTransportObject(objectJson, current, total); } - /// - /// The deserialized object - /// was null - /// was not valid JSON - /// Failed to deserialize to the target type - public object? DeserializeTransportObject(string objectJson) + internal object? DeserializeTransportObject(string objectJson, long? currentObjectCount, long? totalObjectCount) { if (objectJson is null) { @@ -182,7 +183,7 @@ public Base Deserialize(string rootObjectJson) object? converted; try { - converted = ConvertJsonElement(doc1); + converted = ConvertJsonElement(doc1, currentObjectCount, totalObjectCount); } catch (Exception ex) when (!ex.IsFatal() && ex is not OperationCanceledException) { @@ -191,13 +192,13 @@ public Base Deserialize(string rootObjectJson) lock (_callbackLock) { - OnProgressAction?.Invoke("DS", 1); + OnProgressAction?.Invoke(new ProgressArgs(ProgressEvent.DeserializeObject, currentObjectCount, totalObjectCount)); } return converted; } - public object? ConvertJsonElement(JToken doc) + private object? ConvertJsonElement(JToken doc, long? currentObjectCount, long? totalObjectCount) { CancellationToken.ThrowIfCancellationRequested(); @@ -239,7 +240,7 @@ public Base Deserialize(string rootObjectJson) int retListCount = 0; foreach (JToken value in docAsArray) { - object? convertedValue = ConvertJsonElement(value); + object? convertedValue = ConvertJsonElement(value, currentObjectCount, totalObjectCount); retListCount += convertedValue is DataChunk chunk ? chunk.data.Count : 1; jsonList.Add(convertedValue); } @@ -270,7 +271,7 @@ public Base Deserialize(string rootObjectJson) continue; } - dict[prop.Name] = ConvertJsonElement(prop.Value); + dict[prop.Name] = ConvertJsonElement(prop.Value, currentObjectCount, totalObjectCount); } if (!dict.TryGetValue(TYPE_DISCRIMINATOR, out object? speckleType)) @@ -319,7 +320,7 @@ public Base Deserialize(string rootObjectJson) throw new TransportException($"Failed to fetch object id {objId} from {ReadTransport} "); } - deserialized = DeserializeTransportObject(objectJson); + deserialized = DeserializeTransportObject(objectJson, currentObjectCount, totalObjectCount); lock (_deserializedObjects) { diff --git a/src/Speckle.Sdk/Serialisation/BaseObjectSerializerV2.cs b/src/Speckle.Sdk/Serialisation/BaseObjectSerializerV2.cs index 069bf6c2..932d4300 100644 --- a/src/Speckle.Sdk/Serialisation/BaseObjectSerializerV2.cs +++ b/src/Speckle.Sdk/Serialisation/BaseObjectSerializerV2.cs @@ -23,7 +23,7 @@ public class BaseObjectSerializerV2 private List> _parentClosures = new(); private HashSet _parentObjects = new(); private readonly Dictionary> _typedPropertiesCache = new(); - private readonly Action? _onProgressAction; + private readonly Action? _onProgressAction; private readonly bool _trackDetachedChildren; @@ -53,7 +53,7 @@ public BaseObjectSerializerV2() /// public BaseObjectSerializerV2( IReadOnlyCollection writeTransports, - Action? onProgressAction = null, + Action? onProgressAction = null, bool trackDetachedChildren = false, CancellationToken cancellationToken = default ) @@ -86,7 +86,8 @@ public string Serialize(Base baseObj) IReadOnlyDictionary converted; try { - var x = PreserializeBase(baseObj, true); + int count = 0; + var x = PreserializeBase(baseObj, ref count, true); if (x is null) { throw new SpeckleSerializeException("Already serialized"); @@ -114,6 +115,7 @@ public string Serialize(Base baseObj) // (primitives, lists and dictionaries with string keys) public object? PreserializeObject( object? obj, + ref int count, bool computeClosures = false, PropertyAttributeInfo inheritedDetachInfo = default ) @@ -156,13 +158,13 @@ public string Serialize(Base baseObj) } // Complex enough to deserve its own function case Base b: - return PreserializeBase(b, computeClosures, inheritedDetachInfo); + return PreserializeBase(b, ref count, computeClosures, inheritedDetachInfo); case IDictionary d: { Dictionary ret = new(d.Count); foreach (DictionaryEntry kvp in d) { - object? converted = PreserializeObject(kvp.Value, inheritedDetachInfo: inheritedDetachInfo); + object? converted = PreserializeObject(kvp.Value, ref count, inheritedDetachInfo: inheritedDetachInfo); if (converted != null) { ret[kvp.Key.ToString()] = converted; @@ -179,7 +181,7 @@ public string Serialize(Base baseObj) foreach (object? element in e) { - ret.Add(PreserializeObject(element, inheritedDetachInfo: inheritedDetachInfo)); + ret.Add(PreserializeObject(element, ref count, inheritedDetachInfo: inheritedDetachInfo)); } return ret; @@ -245,6 +247,7 @@ public string Serialize(Base baseObj) private IReadOnlyDictionary? PreserializeBase( Base baseObj, + ref int count, bool computeClosures = false, PropertyAttributeInfo inheritedDetachInfo = default ) @@ -262,7 +265,7 @@ public string Serialize(Base baseObj) _parentClosures.Add(closure); } - IReadOnlyDictionary convertedBase = PreserializeBaseProperties(baseObj, closure); + IReadOnlyDictionary convertedBase = PreserializeBaseProperties(baseObj, ref count, closure); if (computeClosures || inheritedDetachInfo.IsDetachable || baseObj is Blob) { @@ -284,9 +287,9 @@ public string Serialize(Base baseObj) var id = (string)convertedBase["id"].NotNull(); StoreObject(id, json); ObjectReference objRef = new() { referencedId = id }; - var objRefConverted = (IReadOnlyDictionary?)PreserializeObject(objRef); + var objRefConverted = (IReadOnlyDictionary?)PreserializeObject(objRef, ref count); UpdateParentClosures(id); - _onProgressAction?.Invoke("S", 1); + _onProgressAction?.Invoke(new(ProgressEvent.SerializeObject, ++count, null)); // add to obj refs to return if (baseObj.applicationId != null && _trackDetachedChildren) // && baseObj is not DataChunk && baseObj is not Abstract) // not needed, as data chunks will never have application ids, and abstract objs are not really used. @@ -346,6 +349,7 @@ public string Serialize(Base baseObj) private IReadOnlyDictionary PreserializeBaseProperties( Base baseObj, + ref int count, IReadOnlyDictionary closure ) { @@ -355,7 +359,7 @@ IReadOnlyDictionary closure // Convert all properties foreach (var prop in allProperties) { - object? convertedValue = PreserializeBasePropertyValue(prop.Value.Item1, prop.Value.Item2); + object? convertedValue = PreserializeBasePropertyValue(prop.Value.Item1, ref count, prop.Value.Item2); if ( convertedValue == null @@ -378,12 +382,12 @@ IReadOnlyDictionary closure return convertedBase; } - private object? PreserializeBasePropertyValue(object? baseValue, PropertyAttributeInfo detachInfo) + private object? PreserializeBasePropertyValue(object? baseValue, ref int count, PropertyAttributeInfo detachInfo) { // If there are no WriteTransports, keep everything attached. if (WriteTransports.Count == 0) { - return PreserializeObject(baseValue, inheritedDetachInfo: detachInfo); + return PreserializeObject(baseValue, ref count, inheritedDetachInfo: detachInfo); } if (baseValue is IEnumerable chunkableCollection && detachInfo.IsChunkable) @@ -406,10 +410,14 @@ IReadOnlyDictionary closure chunks.Add(crtChunk); } - return PreserializeObject(chunks, inheritedDetachInfo: new PropertyAttributeInfo(true, false, 0, null)); + return PreserializeObject( + chunks, + ref count, + inheritedDetachInfo: new PropertyAttributeInfo(true, false, 0, null) + ); } - return PreserializeObject(baseValue, inheritedDetachInfo: detachInfo); + return PreserializeObject(baseValue, ref count, inheritedDetachInfo: detachInfo); } private void UpdateParentClosures(string objectId) diff --git a/src/Speckle.Sdk/Serialisation/SerializationUtilities/DeserializationWorkerThreads.cs b/src/Speckle.Sdk/Serialisation/SerializationUtilities/DeserializationWorkerThreads.cs index 6e995412..af7d463f 100644 --- a/src/Speckle.Sdk/Serialisation/SerializationUtilities/DeserializationWorkerThreads.cs +++ b/src/Speckle.Sdk/Serialisation/SerializationUtilities/DeserializationWorkerThreads.cs @@ -48,7 +48,8 @@ protected override void ThreadMain() try { - var result = RunOperation(taskType, inputValue!, _serializer); + (string objectJson, long? current, long? total) = ((string, long?, long?))inputValue!; + var result = RunOperation(taskType, objectJson, current, total, _serializer); tcs.SetResult(result); } catch (Exception ex) @@ -65,14 +66,16 @@ protected override void ThreadMain() private static object? RunOperation( WorkerThreadTaskType taskType, - object inputValue, + string objectJson, + long? current, + long? total, BaseObjectDeserializerV2 serializer ) { switch (taskType) { case WorkerThreadTaskType.Deserialize: - var converted = serializer.DeserializeTransportObject((string)inputValue); + var converted = serializer.DeserializeTransportObject(objectJson, current, total); return converted; default: throw new ArgumentException( @@ -82,7 +85,7 @@ BaseObjectDeserializerV2 serializer } } - internal Task? TryStartTask(WorkerThreadTaskType taskType, object inputValue) + internal Task? TryStartTask(WorkerThreadTaskType taskType, string? objectJson, long? current, long? total) { bool canStartTask = false; lock (_lockFreeThreads) @@ -100,7 +103,7 @@ BaseObjectDeserializerV2 serializer } TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); - Tasks.Add(new(taskType, inputValue, tcs)); + Tasks.Add(new(taskType, (objectJson, current, total), tcs)); return tcs.Task; } } diff --git a/src/Speckle.Sdk/Transports/DiskTransport.cs b/src/Speckle.Sdk/Transports/DiskTransport.cs index ccb4b4cf..06133e3b 100644 --- a/src/Speckle.Sdk/Transports/DiskTransport.cs +++ b/src/Speckle.Sdk/Transports/DiskTransport.cs @@ -44,7 +44,7 @@ public object Clone() public CancellationToken CancellationToken { get; set; } - public Action? OnProgressAction { get; set; } + public Action? OnProgressAction { get; set; } public Action? OnErrorAction { get; set; } @@ -93,7 +93,7 @@ public void SaveObject(string id, string serializedObject) } SavedObjectCount++; - OnProgressAction?.Invoke(TransportName, SavedObjectCount); + OnProgressAction?.Invoke(new(ProgressEvent.DownloadObject, SavedObjectCount, null)); stopwatch.Stop(); Elapsed += stopwatch.Elapsed; } diff --git a/src/Speckle.Sdk/Transports/IServerTransport.cs b/src/Speckle.Sdk/Transports/IServerTransport.cs index 92b91398..41ecb700 100644 --- a/src/Speckle.Sdk/Transports/IServerTransport.cs +++ b/src/Speckle.Sdk/Transports/IServerTransport.cs @@ -2,7 +2,6 @@ namespace Speckle.Sdk.Transports; public interface IServerTransport : IDisposable, ITransport, IBlobCapableTransport, ICloneable { - int TotalSentBytes { get; } Credentials.Account Account { get; } Uri BaseUri { get; } string StreamId { get; } diff --git a/src/Speckle.Sdk/Transports/ITransport.cs b/src/Speckle.Sdk/Transports/ITransport.cs index 5e7cabd9..c997e6da 100644 --- a/src/Speckle.Sdk/Transports/ITransport.cs +++ b/src/Speckle.Sdk/Transports/ITransport.cs @@ -2,6 +2,18 @@ namespace Speckle.Sdk.Transports; +public enum ProgressEvent +{ + DownloadBytes, + UploadBytes, + DownloadObject, + UploadObject, + DeserializeObject, + SerializeObject, +} + +public record ProgressArgs(ProgressEvent ProgressEvent, long? Count, long? Total); + /// /// Interface defining the contract for transport implementations. /// @@ -22,11 +34,6 @@ public interface ITransport /// public TimeSpan Elapsed { get; } - /// - /// Show how many objects the transport saved. - /// - public int SavedObjectCount { get; } - /// /// Should be checked often and gracefully stop all in progress sending if requested. /// @@ -35,7 +42,7 @@ public interface ITransport /// /// Used to report progress during the transport's longer operations. /// - public Action? OnProgressAction { get; set; } + public Action? OnProgressAction { get; set; } /// /// Signals to the transport that writes are about to begin. diff --git a/src/Speckle.Sdk/Transports/Memory.cs b/src/Speckle.Sdk/Transports/Memory.cs index 4eb8d917..760be056 100644 --- a/src/Speckle.Sdk/Transports/Memory.cs +++ b/src/Speckle.Sdk/Transports/Memory.cs @@ -35,7 +35,7 @@ public object Clone() public string TransportName { get; set; } = "Memory"; - public Action? OnProgressAction { get; set; } + public Action? OnProgressAction { get; set; } public int SavedObjectCount { get; private set; } @@ -58,7 +58,7 @@ public void SaveObject(string id, string serializedObject) Objects[id] = serializedObject; SavedObjectCount++; - OnProgressAction?.Invoke(TransportName, 1); + OnProgressAction?.Invoke(new(ProgressEvent.UploadObject, 1, 1)); stopwatch.Stop(); Elapsed += stopwatch.Elapsed; } diff --git a/src/Speckle.Sdk/Transports/SQLite.cs b/src/Speckle.Sdk/Transports/SQLite.cs index 3389cde9..127d5e22 100644 --- a/src/Speckle.Sdk/Transports/SQLite.cs +++ b/src/Speckle.Sdk/Transports/SQLite.cs @@ -115,7 +115,7 @@ public void Dispose() public CancellationToken CancellationToken { get; set; } - public Action? OnProgressAction { get; set; } + public Action? OnProgressAction { get; set; } public int SavedObjectCount { get; private set; } @@ -331,7 +331,7 @@ private void ConsumeQueue() CancellationToken.ThrowIfCancellationRequested(); } - OnProgressAction?.Invoke(TransportName, saved); + OnProgressAction?.Invoke(new(ProgressEvent.DownloadObject, saved, _queue.Count + 1)); CancellationToken.ThrowIfCancellationRequested(); diff --git a/src/Speckle.Sdk/Transports/ServerUtils/HttpHeaderExtensions.cs b/src/Speckle.Sdk/Transports/ServerUtils/HttpHeaderExtensions.cs new file mode 100644 index 00000000..6d8e82d4 --- /dev/null +++ b/src/Speckle.Sdk/Transports/ServerUtils/HttpHeaderExtensions.cs @@ -0,0 +1,14 @@ +using System.Net.Http.Headers; + +namespace Speckle.Sdk.Transports.ServerUtils; + +public static class HttpHeaderExtensions +{ + public static void CopyTo(this HttpContentHeaders source, HttpContentHeaders destination) + { + foreach (var header in source) + { + destination.Add(header.Key, header.Value); + } + } +} diff --git a/src/Speckle.Sdk/Transports/ServerUtils/IServerApi.cs b/src/Speckle.Sdk/Transports/ServerUtils/IServerApi.cs index c6afe823..4ea9dd47 100644 --- a/src/Speckle.Sdk/Transports/ServerUtils/IServerApi.cs +++ b/src/Speckle.Sdk/Transports/ServerUtils/IServerApi.cs @@ -1,19 +1,31 @@ namespace Speckle.Sdk.Transports.ServerUtils; public delegate void CbObjectDownloaded(string id, string json); -public delegate void CbBlobdDownloaded(); internal interface IServerApi { - public Task DownloadSingleObject(string streamId, string objectId); + public Task DownloadSingleObject(string streamId, string objectId, Action? progress); - public Task DownloadObjects(string streamId, IReadOnlyList objectIds, CbObjectDownloaded onObjectCallback); + public Task DownloadObjects( + string streamId, + IReadOnlyList objectIds, + Action? progress, + CbObjectDownloaded onObjectCallback + ); public Task> HasObjects(string streamId, IReadOnlyList objectIds); - public Task UploadObjects(string streamId, IReadOnlyList<(string id, string data)> objects); + public Task UploadObjects( + string streamId, + IReadOnlyList<(string id, string data)> objects, + Action? progress + ); - public Task UploadBlobs(string streamId, IReadOnlyList<(string id, string data)> objects); + public Task UploadBlobs( + string streamId, + IReadOnlyList<(string id, string data)> objects, + Action? progress + ); - public Task DownloadBlobs(string streamId, IReadOnlyList blobIds, CbBlobdDownloaded onBlobCallback); + public Task DownloadBlobs(string streamId, IReadOnlyList blobIds, Action? progress); } diff --git a/src/Speckle.Sdk/Transports/ServerUtils/ParallelServerAPI.cs b/src/Speckle.Sdk/Transports/ServerUtils/ParallelServerAPI.cs index 12de5a1a..34bb6bb7 100644 --- a/src/Speckle.Sdk/Transports/ServerUtils/ParallelServerAPI.cs +++ b/src/Speckle.Sdk/Transports/ServerUtils/ParallelServerAPI.cs @@ -1,6 +1,7 @@ using System.Collections.Concurrent; using System.Diagnostics; using Speckle.Sdk.Common; +using Speckle.Sdk.Helpers; using Speckle.Sdk.Logging; using Speckle.Sdk.Serialisation.SerializationUtilities; @@ -24,8 +25,6 @@ internal class ParallelServerApi : ParallelOperationExecutor private readonly Uri _baseUri; - private readonly object _callbackLock = new(); - private readonly int _timeoutSeconds; public ParallelServerApi( @@ -50,7 +49,6 @@ public ParallelServerApi( public CancellationToken CancellationToken { get; set; } public bool CompressPayloads { get; set; } = true; - public Action OnBatchSent { get; set; } public string BlobStorageFolder { get; set; } @@ -83,8 +81,8 @@ public async Task> HasObjects(string streamId, IReadOnl Dictionary ret = new(); foreach (var task in tasks) { - var taskResult = (IReadOnlyDictionary)(await task.ConfigureAwait(false))!; - foreach (KeyValuePair kv in taskResult) + var taskResult = (IReadOnlyDictionary?)(await task.ConfigureAwait(false)); + foreach (KeyValuePair kv in taskResult.Empty()) { ret[kv.Key] = kv.Value; } @@ -93,17 +91,18 @@ public async Task> HasObjects(string streamId, IReadOnl return ret; } - public async Task DownloadSingleObject(string streamId, string objectId) + public async Task DownloadSingleObject(string streamId, string objectId, Action? progress) { EnsureStarted(); - Task op = QueueOperation(ServerApiOperation.DownloadSingleObject, (streamId, objectId)); + Task op = QueueOperation(ServerApiOperation.DownloadSingleObject, (streamId, objectId, progress)); object? result = await op.ConfigureAwait(false); - return (string)result!; + return (string?)result; } public async Task DownloadObjects( string streamId, IReadOnlyList objectIds, + Action? progress, CbObjectDownloaded onObjectCallback ) { @@ -129,14 +128,18 @@ CbObjectDownloaded onObjectCallback Task op = QueueOperation( ServerApiOperation.DownloadObjects, - (streamId, splitObjectsIds[i], callbackWrapper) + (streamId, splitObjectsIds[i], progress, callbackWrapper) ); tasks.Add(op); } await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false); } - public async Task UploadObjects(string streamId, IReadOnlyList<(string, string)> objects) + public async Task UploadObjects( + string streamId, + IReadOnlyList<(string, string)> objects, + Action? progress + ) { EnsureStarted(); List> tasks = new(); @@ -162,23 +165,23 @@ public async Task UploadObjects(string streamId, IReadOnlyList<(string, string)> continue; } - var op = QueueOperation(ServerApiOperation.UploadObjects, (streamId, splitObjects[i])); + var op = QueueOperation(ServerApiOperation.UploadObjects, (streamId, splitObjects[i], progress)); tasks.Add(op); } await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false); } - public async Task UploadBlobs(string streamId, IReadOnlyList<(string, string)> blobs) + public async Task UploadBlobs(string streamId, IReadOnlyList<(string, string)> blobs, Action? progress) { EnsureStarted(); - var op = QueueOperation(ServerApiOperation.UploadBlobs, (streamId, blobs)); + var op = QueueOperation(ServerApiOperation.UploadBlobs, (streamId, blobs, progress)); await op.ConfigureAwait(false); } - public async Task DownloadBlobs(string streamId, IReadOnlyList blobIds, CbBlobdDownloaded onBlobDownloaded) + public async Task DownloadBlobs(string streamId, IReadOnlyList blobIds, Action? progress) { EnsureStarted(); - var op = QueueOperation(ServerApiOperation.DownloadBlobs, (streamId, blobIds, onBlobDownloaded)); + var op = QueueOperation(ServerApiOperation.DownloadBlobs, (streamId, blobIds, progress)); await op.ConfigureAwait(false); } @@ -204,14 +207,6 @@ public void EnsureStarted() protected override void ThreadMain() { using ServerApi serialApi = new(_baseUri, _authToken, BlobStorageFolder, _timeoutSeconds); - - serialApi.OnBatchSent = (num, size) => - { - lock (_callbackLock) - { - OnBatchSent(num, size); - } - }; serialApi.CancellationToken = CancellationToken; serialApi.CompressPayloads = CompressPayloads; @@ -225,7 +220,7 @@ protected override void ThreadMain() try { - var result = RunOperation(operation, inputValue!, serialApi).GetAwaiter().GetResult(); + var result = RunOperation(operation, inputValue.NotNull(), serialApi).GetAwaiter().GetResult(); tcs.SetResult(result); } catch (Exception ex) @@ -245,22 +240,35 @@ protected override void ThreadMain() switch (operation) { case ServerApiOperation.DownloadSingleObject: - var (dsoStreamId, dsoObjectId) = ((string, string))inputValue; - return await serialApi.DownloadSingleObject(dsoStreamId, dsoObjectId).ConfigureAwait(false); + var (dsoStreamId, dsoObjectId, progress) = ((string, string, Action?))inputValue; + return await serialApi.DownloadSingleObject(dsoStreamId, dsoObjectId, progress).ConfigureAwait(false); case ServerApiOperation.DownloadObjects: - var (doStreamId, doObjectIds, doCallback) = ((string, IReadOnlyList, CbObjectDownloaded))inputValue; - await serialApi.DownloadObjects(doStreamId, doObjectIds, doCallback).ConfigureAwait(false); + var (doStreamId, doObjectIds, progress2, doCallback) = (( + string, + IReadOnlyList, + Action?, + CbObjectDownloaded + ))inputValue; + await serialApi.DownloadObjects(doStreamId, doObjectIds, progress2, doCallback).ConfigureAwait(false); return null; case ServerApiOperation.HasObjects: var (hoStreamId, hoObjectIds) = ((string, IReadOnlyList))inputValue; return await serialApi.HasObjects(hoStreamId, hoObjectIds).ConfigureAwait(false); case ServerApiOperation.UploadObjects: - var (uoStreamId, uoObjects) = ((string, IReadOnlyList<(string, string)>))inputValue; - await serialApi.UploadObjects(uoStreamId, uoObjects).ConfigureAwait(false); + var (uoStreamId, uoObjects, progress3) = (( + string, + IReadOnlyList<(string, string)>, + Action? + ))inputValue; + await serialApi.UploadObjects(uoStreamId, uoObjects, progress3).ConfigureAwait(false); return null; case ServerApiOperation.UploadBlobs: - var (ubStreamId, ubBlobs) = ((string, IReadOnlyList<(string, string)>))inputValue; - await serialApi.UploadBlobs(ubStreamId, ubBlobs).ConfigureAwait(false); + var (ubStreamId, ubBlobs, progress4) = (( + string, + IReadOnlyList<(string, string)>, + Action? + ))inputValue; + await serialApi.UploadBlobs(ubStreamId, ubBlobs, progress4).ConfigureAwait(false); return null; case ServerApiOperation.HasBlobs: var (hbStreamId, hBlobs) = ((string, IReadOnlyList<(string, string)>))inputValue; @@ -268,8 +276,8 @@ protected override void ThreadMain() .HasBlobs(hbStreamId, hBlobs.Select(b => b.Item1.Split(':')[1]).ToList()) .ConfigureAwait(false); case ServerApiOperation.DownloadBlobs: - var (dbStreamId, blobIds, cb) = ((string, IReadOnlyList, CbBlobdDownloaded))inputValue; - await serialApi.DownloadBlobs(dbStreamId, blobIds, cb).ConfigureAwait(false); + var (dbStreamId, blobIds, progress5) = ((string, IReadOnlyList, Action?))inputValue; + await serialApi.DownloadBlobs(dbStreamId, blobIds, progress5).ConfigureAwait(false); return null; default: throw new ArgumentOutOfRangeException(nameof(operation), operation, null); diff --git a/src/Speckle.Sdk/Transports/ServerUtils/ProgressContent.cs b/src/Speckle.Sdk/Transports/ServerUtils/ProgressContent.cs new file mode 100644 index 00000000..eddd09d3 --- /dev/null +++ b/src/Speckle.Sdk/Transports/ServerUtils/ProgressContent.cs @@ -0,0 +1,45 @@ +using System.Net; + +namespace Speckle.Sdk.Transports.ServerUtils; + +internal class ProgressContent : HttpContent +{ + private readonly HttpContent _innerContent; + private readonly Action? _progress; + + public ProgressContent(HttpContent innerContent, Action? progress) + { + _innerContent = innerContent; + _progress = progress; + + innerContent.Headers.CopyTo(Headers); + } + + protected override Task SerializeToStreamAsync(Stream stream, TransportContext context) + { + ProgressStream progressStream = new(stream, _innerContent.Headers.ContentLength, _progress, false); + return _innerContent.CopyToAsync(progressStream); + } + + protected override bool TryComputeLength(out long length) + { + long? contentLength = _innerContent.Headers.ContentLength; + if (contentLength.HasValue) + { + length = contentLength.Value; + return true; + } + + length = -1; + return false; + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + if (disposing) + { + _innerContent.Dispose(); + } + } +} diff --git a/src/Speckle.Sdk/Transports/ServerUtils/ProgressStream.cs b/src/Speckle.Sdk/Transports/ServerUtils/ProgressStream.cs new file mode 100644 index 00000000..25591dbf --- /dev/null +++ b/src/Speckle.Sdk/Transports/ServerUtils/ProgressStream.cs @@ -0,0 +1,38 @@ +namespace Speckle.Sdk.Transports; + +internal class ProgressStream(Stream input, long? streamLength, Action? progress, bool useBuffer) : Stream +{ + private long _position; + private readonly Stream _stream = useBuffer ? new BufferedStream(input, 80 * 1024) : input; + + public override void Flush() => _stream.Flush(); + + public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException(); + + public override void SetLength(long value) => throw new NotImplementedException(); + + public override int Read(byte[] buffer, int offset, int count) + { + int n = _stream.Read(buffer, offset, count); + _position += n; + progress?.Invoke(new(ProgressEvent.DownloadBytes, _position, streamLength)); + return n; + } + + public override void Write(byte[] buffer, int offset, int count) + { + _stream.Write(buffer, offset, count); + _position += count; + progress?.Invoke(new(ProgressEvent.UploadBytes, _position, streamLength)); + } + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => true; + public override long Length => streamLength ?? 0; + public override long Position + { + get => _position; + set => throw new NotImplementedException(); + } +} diff --git a/src/Speckle.Sdk/Transports/ServerUtils/ServerAPI.cs b/src/Speckle.Sdk/Transports/ServerUtils/ServerAPI.cs index b8e9b12c..b14fb6d6 100644 --- a/src/Speckle.Sdk/Transports/ServerUtils/ServerAPI.cs +++ b/src/Speckle.Sdk/Transports/ServerUtils/ServerAPI.cs @@ -3,6 +3,7 @@ using System.Text; using Speckle.Newtonsoft.Json; using Speckle.Newtonsoft.Json.Linq; +using Speckle.Sdk.Common; using Speckle.Sdk.Helpers; using Speckle.Sdk.Logging; using Speckle.Sdk.Models; @@ -32,7 +33,7 @@ public ServerApi(Uri baseUri, string? authorizationToken, string blobStorageFold BlobStorageFolder = blobStorageFolder; _client = Http.GetHttpProxyClient( - new SpeckleHttpClientHandler { AutomaticDecompression = DecompressionMethods.GZip } + new SpeckleHttpClientHandler(new HttpClientHandler { AutomaticDecompression = DecompressionMethods.GZip }) ); _client.BaseAddress = baseUri; @@ -47,18 +48,14 @@ public ServerApi(Uri baseUri, string? authorizationToken, string blobStorageFold public string BlobStorageFolder { get; set; } - /// - /// Callback when sending batches. Parameters: object count, total bytes sent - /// - public Action OnBatchSent { get; set; } - public void Dispose() { _client.Dispose(); } - public async Task DownloadSingleObject(string streamId, string objectId) + public async Task DownloadSingleObject(string streamId, string objectId, Action? progress) { + using var _ = SpeckleActivityFactory.Start(); CancellationToken.ThrowIfCancellationRequested(); // Get root object @@ -72,15 +69,15 @@ public async Task DownloadSingleObject(string streamId, string objectId) .SendAsync(rootHttpMessage, HttpCompletionOption.ResponseContentRead, CancellationToken) .ConfigureAwait(false); - rootHttpResponse.EnsureSuccessStatusCode(); - - string rootObjectStr = await rootHttpResponse.Content.ReadAsStringAsync().ConfigureAwait(false); + string? rootObjectStr = null; + await ResponseProgress(rootHttpResponse, progress, (_, json) => rootObjectStr = json, true).ConfigureAwait(false); return rootObjectStr; } public async Task DownloadObjects( string streamId, IReadOnlyList objectIds, + Action? progress, CbObjectDownloaded onObjectCallback ) { @@ -88,10 +85,11 @@ CbObjectDownloaded onObjectCallback { return; } + using var _ = SpeckleActivityFactory.Start(); if (objectIds.Count < BATCH_SIZE_GET_OBJECTS) { - await DownloadObjectsImpl(streamId, objectIds, onObjectCallback).ConfigureAwait(false); + await DownloadObjectsImpl(streamId, objectIds, progress, onObjectCallback).ConfigureAwait(false); return; } @@ -100,12 +98,12 @@ CbObjectDownloaded onObjectCallback { if (crtRequest.Count >= BATCH_SIZE_GET_OBJECTS) { - await DownloadObjectsImpl(streamId, crtRequest, onObjectCallback).ConfigureAwait(false); + await DownloadObjectsImpl(streamId, crtRequest, progress, onObjectCallback).ConfigureAwait(false); crtRequest = new List(); } crtRequest.Add(id); } - await DownloadObjectsImpl(streamId, crtRequest, onObjectCallback).ConfigureAwait(false); + await DownloadObjectsImpl(streamId, crtRequest, progress, onObjectCallback).ConfigureAwait(false); } public async Task> HasObjects(string streamId, IReadOnlyList objectIds) @@ -142,7 +140,11 @@ public async Task> HasObjects(string streamId, IReadOnl return ret; } - public async Task UploadObjects(string streamId, IReadOnlyList<(string, string)> objects) + public async Task UploadObjects( + string streamId, + IReadOnlyList<(string, string)> objects, + Action? progress + ) { if (objects.Count == 0) { @@ -189,31 +191,30 @@ public async Task UploadObjects(string streamId, IReadOnlyList<(string, string)> // 2. Split multiparts into individual server requests of max size MAX_REQUEST_SIZE or max length MAX_MULTIPART_COUNT and send them List> crtRequest = new(); int crtRequestSize = 0; - int crtObjectCount = 0; for (int i = 0; i < multipartedObjects.Count; i++) { List<(string, string)> multipart = multipartedObjects[i]; int multipartSize = multipartedObjectsSize[i]; if (crtRequestSize + multipartSize > MAX_REQUEST_SIZE || crtRequest.Count >= MAX_MULTIPART_COUNT) { - await UploadObjectsImpl(streamId, crtRequest).ConfigureAwait(false); - OnBatchSent?.Invoke(crtObjectCount, crtRequestSize); + await UploadObjectsImpl(streamId, crtRequest, progress).ConfigureAwait(false); crtRequest = new List>(); crtRequestSize = 0; - crtObjectCount = 0; } crtRequest.Add(multipart); crtRequestSize += multipartSize; - crtObjectCount += multipart.Count; } if (crtRequest.Count > 0) { - await UploadObjectsImpl(streamId, crtRequest).ConfigureAwait(false); - OnBatchSent?.Invoke(crtObjectCount, crtRequestSize); + await UploadObjectsImpl(streamId, crtRequest, progress).ConfigureAwait(false); } } - public async Task UploadBlobs(string streamId, IReadOnlyList<(string, string)> objects) + public async Task UploadBlobs( + string streamId, + IReadOnlyList<(string, string)> objects, + Action? progress + ) { CancellationToken.ThrowIfCancellationRequested(); if (objects.Count == 0) @@ -238,7 +239,7 @@ public async Task UploadBlobs(string streamId, IReadOnlyList<(string, string)> o { RequestUri = new Uri($"/api/stream/{streamId}/blob", UriKind.Relative), Method = HttpMethod.Post, - Content = multipartFormDataContent + Content = new ProgressContent(multipartFormDataContent, progress) }; try @@ -261,7 +262,7 @@ public async Task UploadBlobs(string streamId, IReadOnlyList<(string, string)> o } } - public async Task DownloadBlobs(string streamId, IReadOnlyList blobIds, CbBlobdDownloaded onBlobCallback) + public async Task DownloadBlobs(string streamId, IReadOnlyList blobIds, Action? progress) { foreach (var blobId in blobIds) { @@ -283,12 +284,14 @@ public async Task DownloadBlobs(string streamId, IReadOnlyList blobIds, BlobStorageFolder, $"{blobId.Substring(0, Blob.LocalHashPrefixLength)}-{fileName}" ); - using (var fs = new FileStream(fileLocation, FileMode.OpenOrCreate)) - { - await response.Content.CopyToAsync(fs).ConfigureAwait(false); - } - - onBlobCallback(); + using var source = new ProgressStream( + await response.Content.ReadAsStreamAsync(), + response.Content.Headers.ContentLength, + progress, + true + ); + using var fs = new FileStream(fileLocation, FileMode.OpenOrCreate); + await source.CopyToAsync(fs).ConfigureAwait(false); } catch (Exception ex) when (!ex.IsFatal()) { @@ -300,6 +303,7 @@ public async Task DownloadBlobs(string streamId, IReadOnlyList blobIds, private async Task DownloadObjectsImpl( string streamId, IReadOnlyList objectIds, + Action? progress, CbObjectDownloaded onObjectCallback ) { @@ -319,23 +323,39 @@ CbObjectDownloaded onObjectCallback childrenHttpMessage.Headers.Add("Accept", "text/plain"); HttpResponseMessage childrenHttpResponse = await _client - .SendAsync(childrenHttpMessage, HttpCompletionOption.ResponseHeadersRead, CancellationToken) + .SendAsync(childrenHttpMessage, CancellationToken) .ConfigureAwait(false); - childrenHttpResponse.EnsureSuccessStatusCode(); + await ResponseProgress(childrenHttpResponse, progress, onObjectCallback, false).ConfigureAwait(false); + } + private async Task ResponseProgress( + HttpResponseMessage childrenHttpResponse, + Action? progress, + CbObjectDownloaded onObjectCallback, + bool isSingle + ) + { + childrenHttpResponse.EnsureSuccessStatusCode(); + var length = childrenHttpResponse.Content.Headers.ContentLength; using Stream childrenStream = await childrenHttpResponse.Content.ReadAsStreamAsync().ConfigureAwait(false); - using var reader = new StreamReader(childrenStream, Encoding.UTF8); - while (reader.ReadLine() is { } line) + using var reader = new StreamReader(new ProgressStream(childrenStream, length, progress, true), Encoding.UTF8); + while (await reader.ReadLineAsync().ConfigureAwait(false) is { } line) { CancellationToken.ThrowIfCancellationRequested(); - var pcs = line.Split(s_separator, 2); - onObjectCallback(pcs[0], pcs[1]); + if (!isSingle) + { + var pcs = line.Split(s_separator, 2); + onObjectCallback(pcs[0], pcs[1]); + } + else + { + onObjectCallback(string.Empty, line); + break; + } } - - // Console.WriteLine($"ServerApi::DownloadObjects({objectIds.Count}) request in {sw.ElapsedMilliseconds / 1000.0} sec"); } private async Task> HasObjectsImpl(string streamId, IReadOnlyList objectIds) @@ -360,7 +380,7 @@ private async Task> HasObjectsImpl(string streamId, IRe JObject doc = JObject.Parse(hasObjectsJson); foreach (KeyValuePair prop in doc) { - hasObjects[prop.Key] = (bool)prop!.Value!; + hasObjects[prop.Key] = (bool)prop.Value.NotNull(); } // Console.WriteLine($"ServerApi::HasObjects({objectIds.Count}) request in {sw.ElapsedMilliseconds / 1000.0} sec"); @@ -368,10 +388,12 @@ private async Task> HasObjectsImpl(string streamId, IRe return hasObjects; } - private async Task UploadObjectsImpl(string streamId, List> multipartedObjects) + private async Task UploadObjectsImpl( + string streamId, + List> multipartedObjects, + Action? progress + ) { - // Stopwatch sw = new Stopwatch(); sw.Start(); - CancellationToken.ThrowIfCancellationRequested(); using HttpRequestMessage message = @@ -408,12 +430,10 @@ private async Task UploadObjectsImpl(string streamId, List> HasBlobs(string streamId, IReadOnlyList blobIds) @@ -438,16 +458,4 @@ public async Task> HasBlobs(string streamId, IReadOnlyList return parsed; } - - private sealed class BlobUploadResult - { - public List uploadResults { get; set; } - } - - private sealed class BlobUploadResultItem - { - public string blobId { get; set; } - public string formKey { get; set; } - public string fileName { get; set; } - } } diff --git a/src/Speckle.Sdk/Transports/ServerV2.cs b/src/Speckle.Sdk/Transports/ServerV2.cs index 4632c408..8fde59fe 100644 --- a/src/Speckle.Sdk/Transports/ServerV2.cs +++ b/src/Speckle.Sdk/Transports/ServerV2.cs @@ -1,5 +1,6 @@ using System.Diagnostics; using Speckle.Newtonsoft.Json.Linq; +using Speckle.Sdk.Common; using Speckle.Sdk.Credentials; using Speckle.Sdk.Logging; using Speckle.Sdk.Models; @@ -45,8 +46,6 @@ public ServerTransport(Account account, string streamId, int timeoutSeconds = 60 Directory.CreateDirectory(BlobStorageFolder); } - public int TotalSentBytes { get; private set; } - public Account Account { get; } public Uri BaseUri { get; } public string StreamId { get; internal set; } @@ -105,8 +104,7 @@ public void Dispose() }; public CancellationToken CancellationToken { get; set; } - public Action? OnProgressAction { get; set; } - public int SavedObjectCount { get; private set; } + public Action? OnProgressAction { get; set; } public TimeSpan Elapsed { get; private set; } = TimeSpan.Zero; public async Task CopyObjectAndChildren( @@ -127,8 +125,8 @@ public async Task CopyObjectAndChildren( var stopwatch = Stopwatch.StartNew(); api.CancellationToken = CancellationToken; - string rootObjectJson = await api.DownloadSingleObject(StreamId, id).ConfigureAwait(false); - IList allIds = ParseChildrenIds(rootObjectJson); + string? rootObjectJson = await api.DownloadSingleObject(StreamId, id, OnProgressAction).ConfigureAwait(false); + IList allIds = ParseChildrenIds(rootObjectJson.NotNull()); List childrenIds = allIds.Where(x => !x.Contains("blob:")).ToList(); List blobIds = allIds.Where(x => x.Contains("blob:")).Select(x => x.Remove(0, 5)).ToList(); @@ -148,11 +146,11 @@ public async Task CopyObjectAndChildren( await api.DownloadObjects( StreamId, newChildrenIds, + OnProgressAction, (childId, childData) => { stopwatch.Stop(); targetTransport.SaveObject(childId, childData); - OnProgressAction?.Invoke(TransportName, 1); stopwatch.Start(); } ) @@ -180,15 +178,7 @@ await api.DownloadObjects( .Where(blobId => !localBlobTrimmedHashes.Contains(blobId.Substring(0, Blob.LocalHashPrefixLength))) .ToList(); - await api.DownloadBlobs( - StreamId, - newBlobIds, - () => - { - OnProgressAction?.Invoke(TransportName, 1); - } - ) - .ConfigureAwait(false); + await api.DownloadBlobs(StreamId, newBlobIds, OnProgressAction).ConfigureAwait(false); stopwatch.Stop(); Elapsed += stopwatch.Elapsed; @@ -199,10 +189,10 @@ public string GetObject(string id) { CancellationToken.ThrowIfCancellationRequested(); var stopwatch = Stopwatch.StartNew(); - var result = Api.DownloadSingleObject(StreamId, id).Result; + var result = Api.DownloadSingleObject(StreamId, id, OnProgressAction).Result; stopwatch.Stop(); Elapsed += stopwatch.Elapsed; - return result; + return result.NotNull(); } public async Task> HasObjects(IReadOnlyList objectIds) @@ -231,9 +221,6 @@ public void BeginWrite() throw new InvalidOperationException("ServerTransport already sending"); } - TotalSentBytes = 0; - SavedObjectCount = 0; - _exception = null; _shouldSendThreadRun = true; _sendingThread = new Thread(SendingThreadMain) { Name = "ServerTransportSender", IsBackground = true }; @@ -279,15 +266,7 @@ private void Initialize(string baseUri) { SpeckleLog.Logger.Information("Initializing a new Remote Transport for {baseUri}", baseUri); - Api = new ParallelServerApi(BaseUri, AuthorizationToken, BlobStorageFolder, TimeoutSeconds) - { - OnBatchSent = (num, size) => - { - OnProgressAction?.Invoke(TransportName, num); - TotalSentBytes += size; - SavedObjectCount += num; - } - }; + Api = new ParallelServerApi(BaseUri, AuthorizationToken, BlobStorageFolder, TimeoutSeconds); } public override string ToString() @@ -368,10 +347,7 @@ private async void SendingThreadMain() } } - // Report the objects that are already on the server - OnProgressAction?.Invoke(TransportName, hasObjects.Count - newObjects.Count); - - await Api.UploadObjects(StreamId, newObjects).ConfigureAwait(false); + await Api.UploadObjects(StreamId, newObjects, OnProgressAction).ConfigureAwait(false); if (bufferBlobs.Count != 0) { @@ -380,7 +356,7 @@ private async void SendingThreadMain() var newBlobs = bufferBlobs.Where(tuple => formattedIds.IndexOf(tuple.id) != -1).ToList(); if (newBlobs.Count != 0) { - await Api.UploadBlobs(StreamId, newBlobs).ConfigureAwait(false); + await Api.UploadBlobs(StreamId, newBlobs, OnProgressAction).ConfigureAwait(false); } } } diff --git a/src/Speckle.Sdk/Transports/TransportHelpers.cs b/src/Speckle.Sdk/Transports/TransportHelpers.cs index e4f5746e..9dfdfda7 100644 --- a/src/Speckle.Sdk/Transports/TransportHelpers.cs +++ b/src/Speckle.Sdk/Transports/TransportHelpers.cs @@ -1,4 +1,4 @@ -using System.Diagnostics.CodeAnalysis; +using System.Diagnostics.CodeAnalysis; using Speckle.Newtonsoft.Json; using Speckle.Sdk.Serialisation; @@ -51,7 +51,8 @@ CancellationToken cancellationToken } targetTransport.SaveObject(kvp.Key, child); - sourceTransport.OnProgressAction?.Invoke($"{sourceTransport}", i++); + var count = i++; + sourceTransport.OnProgressAction?.Invoke(new ProgressArgs(ProgressEvent.UploadObject, count, closures.Count)); } } diff --git a/tests/Speckle.Sdk.Serialization.Tests/TestTransport.cs b/tests/Speckle.Sdk.Serialization.Tests/TestTransport.cs index 9baf0797..5b237e2b 100644 --- a/tests/Speckle.Sdk.Serialization.Tests/TestTransport.cs +++ b/tests/Speckle.Sdk.Serialization.Tests/TestTransport.cs @@ -21,7 +21,7 @@ public string TransportName public TimeSpan Elapsed { get; } public int SavedObjectCount { get; } public CancellationToken CancellationToken { get; set; } - public Action? OnProgressAction { get; set; } + public Action? OnProgressAction { get; set; } public Action? OnErrorAction { get; set; } public void BeginWrite() => throw new NotImplementedException(); diff --git a/tests/Speckle.Sdk.Tests.Unit/Api/Operations/SendReceiveLocal.cs b/tests/Speckle.Sdk.Tests.Unit/Api/Operations/SendReceiveLocal.cs index fca3683c..ba2f3c95 100644 --- a/tests/Speckle.Sdk.Tests.Unit/Api/Operations/SendReceiveLocal.cs +++ b/tests/Speckle.Sdk.Tests.Unit/Api/Operations/SendReceiveLocal.cs @@ -52,7 +52,7 @@ public async Task LocalUpload() [Test(Description = "Pulling a commit locally"), Order(2)] public async Task LocalDownload() { - var commitPulled = await Sdk.Api.Operations.Receive(_objId01!); + var commitPulled = await Sdk.Api.Operations.Receive(_objId01.NotNull()); Assert.That(((List)commitPulled["@items"].NotNull())[0], Is.TypeOf()); Assert.That(((List)commitPulled["@items"].NotNull()), Has.Count.EqualTo(NUM_OBJECTS)); @@ -196,7 +196,7 @@ public async Task UploadProgressReports() ); } - ConcurrentDictionary? progress = null; + ConcurrentBag? progress = null; (_commitId02, _) = await Sdk.Api.Operations.Send( myObject, _sut, @@ -207,22 +207,22 @@ public async Task UploadProgressReports() } ); progress.NotNull(); - Assert.That(progress.Keys, Has.Count.GreaterThanOrEqualTo(1)); + Assert.That(progress, Has.Count.GreaterThanOrEqualTo(1)); } [Test(Description = "Should show progress!"), Order(5)] public async Task DownloadProgressReports() { - ConcurrentDictionary? progress = null; - var pulledCommit = await Sdk.Api.Operations.Receive( - _commitId02!, + ConcurrentBag? progress = null; + await Sdk.Api.Operations.Receive( + _commitId02.NotNull(), onProgressAction: dict => { progress = dict; } ); progress.NotNull(); - Assert.That(progress.Keys, Has.Count.GreaterThanOrEqualTo(1)); + Assert.That(progress, Has.Count.GreaterThanOrEqualTo(1)); } [Test(Description = "Should not dispose of transports if so specified.")] diff --git a/tests/Speckle.Sdk.Tests.Unit/Transports/TransportTests.cs b/tests/Speckle.Sdk.Tests.Unit/Transports/TransportTests.cs index a2b9e996..1574bc9d 100644 --- a/tests/Speckle.Sdk.Tests.Unit/Transports/TransportTests.cs +++ b/tests/Speckle.Sdk.Tests.Unit/Transports/TransportTests.cs @@ -97,7 +97,7 @@ public async Task SaveObject_ConcurrentWrites() public async Task ProgressAction_Called_OnSaveObject() { bool wasCalled = false; - Sut.NotNull().OnProgressAction = (_, _) => wasCalled = true; + Sut.NotNull().OnProgressAction = (_) => wasCalled = true; Sut.SaveObject("12345", "fake payload data");