Skip to content

Commit

Permalink
Add Progress for transfers (#74)
Browse files Browse the repository at this point in the history
* progress intermediate commit

* add progress for download

* remove unused code

* remove batch sent callbacks

* multi-threaded deserialize works

* Progress for download and deserialization

* Fix tests

* Have less indeterminate deserialization

* fix deserialization

* make download faster with buffered stream

* put local receive back

* remove unused callback

* fmt
  • Loading branch information
adamhathcock authored Aug 15, 2024
1 parent ea5ed87 commit dc4da49
Show file tree
Hide file tree
Showing 29 changed files with 412 additions and 389 deletions.
2 changes: 1 addition & 1 deletion src/Speckle.Sdk/Api/GraphQL/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ private static GraphQLHttpClient CreateGraphQLClient(Account account, HttpClient

private static HttpClient CreateHttpClient(Account account)
{
var httpClient = Http.GetHttpProxyClient(null, TimeSpan.FromSeconds(30));
var httpClient = Http.GetHttpProxyClient(timeout: TimeSpan.FromSeconds(30));
Http.AddAuthHeader(httpClient, account.token);

httpClient.DefaultRequestHeaders.Add("apollographql-client-name", Setup.ApplicationVersion);
Expand Down
117 changes: 9 additions & 108 deletions src/Speckle.Sdk/Api/Helpers.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#nullable disable
using System.Collections.Concurrent;
using System.Diagnostics.Contracts;
using System.Reflection;
using System.Runtime.InteropServices;
using Speckle.Newtonsoft.Json;
using Speckle.Sdk.Api.GraphQL.Models;
using Speckle.Sdk.Common;
using Speckle.Sdk.Credentials;
using Speckle.Sdk.Helpers;
using Speckle.Sdk.Host;
Expand All @@ -30,9 +30,9 @@ public static class Helpers
public static async Task<Base> Receive(
this IServerTransportFactory serverTransportFactory,
string stream,
Account account = null,
Action<ConcurrentDictionary<string, int>> onProgressAction = null,
Action<int> onTotalChildrenCountKnown = null
Account? account = null,
Action<ConcurrentBag<ProgressArgs>>? onProgressAction = null,
Action<int>? onTotalChildrenCountKnown = null
)
{
var sw = new StreamWrapper(stream);
Expand Down Expand Up @@ -61,25 +61,25 @@ public static async Task<Base> Receive(
using var transport = serverTransportFactory.Create(client.Account, sw.StreamId);

string objectId = "";
Commit commit = null;
Commit? commit = null;

//OBJECT URL
if (!string.IsNullOrEmpty(sw.ObjectId))
{
objectId = sw.ObjectId;
objectId = sw.ObjectId.NotNull();
}
//COMMIT URL
else if (!string.IsNullOrEmpty(sw.CommitId))
{
commit = await client.CommitGet(sw.StreamId, sw.CommitId).ConfigureAwait(false);
commit = await client.CommitGet(sw.StreamId, sw.CommitId.NotNull()).ConfigureAwait(false);
objectId = commit.referencedObject;
}
//BRANCH URL OR STREAM URL
else
{
var branchName = string.IsNullOrEmpty(sw.BranchName) ? "main" : sw.BranchName;

var branch = await client.BranchGet(sw.StreamId, branchName, 1).ConfigureAwait(false);
var branch = await client.BranchGet(sw.StreamId, branchName.NotNull(), 1).ConfigureAwait(false);
if (branch.commits.items.Count == 0)
{
throw new SpeckleException("The selected branch has no commits.");
Expand All @@ -94,7 +94,7 @@ public static async Task<Base> Receive(
Analytics.Events.Receive,
new Dictionary<string, object>
{
{ "sourceHostApp", HostApplications.GetHostAppFromString(commit.sourceApplication).Slug },
{ "sourceHostApp", HostApplications.GetHostAppFromString(commit.NotNull().sourceApplication).Slug },
{ "sourceHostAppVersion", commit.sourceApplication }
}
);
Expand Down Expand Up @@ -129,105 +129,6 @@ await client
return receiveRes;
}

/// <summary>
/// Helper method to Send to a Speckle Server.
/// </summary>
/// <param name="stream">Stream URL or Id to send to. If the URL contains branchName, commitId or objectId those will be used, otherwise the latest commit from main will be received.</param>
/// <param name="data">Data to send</param>
/// <param name="account">Account to use. If not provided the default account will be used.</param>
/// <param name="useDefaultCache">Toggle for the default cache. If set to false, it will only send to the provided transports.</param>
/// <param name="onProgressAction">Action invoked on progress iterations.</param>
/// <returns></returns>
public static async Task<string> Send(
this IServerTransportFactory serverTransportFactory,
string stream,
Base data,
string message = "No message",
string sourceApplication = ".net",
int totalChildrenCount = 0,
Account account = null,
bool useDefaultCache = true,
Action<ConcurrentDictionary<string, int>> onProgressAction = null
)
{
var sw = new StreamWrapper(stream);

using var client = new Client(account ?? await sw.GetAccount().ConfigureAwait(false));

using var transport = serverTransportFactory.Create(client.Account, sw.StreamId);
var branchName = string.IsNullOrEmpty(sw.BranchName) ? "main" : sw.BranchName;

var (objectId, _) = await Operations.Send(data, transport, useDefaultCache, onProgressAction).ConfigureAwait(false);

Analytics.TrackEvent(client.Account, Analytics.Events.Send);

return await client
.CommitCreate(
new CommitCreateInput
{
streamId = sw.StreamId,
branchName = branchName,
objectId = objectId,
message = message,
sourceApplication = sourceApplication,
totalChildrenCount = totalChildrenCount
}
)
.ConfigureAwait(false);
}

/// <summary>
///
/// </summary>
/// <param name="slug">The connector slug eg. revit, rhino, etc</param>
/// <returns></returns>
public static async Task<bool> IsConnectorUpdateAvailable(string slug)
{
//when debugging the version is not correct, so don't bother
if (!Analytics.IsReleaseMode)
{
return false;
}

try
{
using HttpClient client = Http.GetHttpProxyClient();
var response = await client.GetStringAsync($"{FEEDS_ENDPOINT}/{slug}.json").ConfigureAwait(false);
var connector = JsonConvert.DeserializeObject<Connector>(response);

var os = Os.Win; //TODO: This won't work for linux
if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
{
os = Os.OSX;
}

var versions = connector.Versions.Where(x => x.Os == os).OrderByDescending(x => x.Date).ToList();
var stables = versions.Where(x => !x.Prerelease).ToArray();
if (stables.Length == 0)
{
return false;
}

var latestVersion = new System.Version(stables.First().Number);

var currentVersion = Assembly.GetAssembly(typeof(Helpers)).GetName().Version;

if (latestVersion > currentVersion)
{
return true;
}
}
catch (Exception ex) when (!ex.IsFatal())
{
//.Log.ForContext("slug", slug)
SpeckleLog.Logger.Warning(ex, "Failed to check for connector updates");
}

return false;
}

#nullable enable

/// <inheritdoc cref="TimeAgo(DateTime)"/>
/// <param name="fallback">value to fallback to if the given <paramref name="timestamp"/> is <see langword="null"/></param>
public static string TimeAgo(DateTime? timestamp, string fallback = "Never")
Expand Down
2 changes: 1 addition & 1 deletion src/Speckle.Sdk/Api/Operations/Operations.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static async Task<Base> Receive(
string objectId,
ITransport? remoteTransport = null,
ITransport? localTransport = null,
Action<ConcurrentDictionary<string, int>>? onProgressAction = null,
Action<ConcurrentBag<ProgressArgs>>? onProgressAction = null,
Action<int>? onTotalChildrenCountKnown = null,
CancellationToken cancellationToken = default
)
Expand Down
14 changes: 5 additions & 9 deletions src/Speckle.Sdk/Api/Operations/Operations.Send.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public static partial class Operations
/// Sends a Speckle Object to the provided <paramref name="transport"/> and (optionally) the default local cache
/// </summary>
/// <remarks/>
/// <inheritdoc cref="Send(Base, IReadOnlyCollection{ITransport}, Action{ConcurrentDictionary{string,int}}?, CancellationToken)"/>
/// <inheritdoc cref="Send(Base, IReadOnlyCollection{ITransport}, Action{ConcurrentBag{ProgressArgs}}?, CancellationToken)"/>
/// <param name="useDefaultCache">When <see langword="true"/>, an additional <see cref="SQLiteTransport"/> will be included</param>
/// <exception cref="ArgumentNullException">The <paramref name="transport"/> or <paramref name="value"/> was <see langword="null"/></exception>
/// <example><code>
Expand All @@ -25,7 +25,7 @@ public static partial class Operations
Base value,
ITransport transport,
bool useDefaultCache,
Action<ConcurrentDictionary<string, int>>? onProgressAction = null,
Action<ConcurrentBag<ProgressArgs>>? onProgressAction = null,
CancellationToken cancellationToken = default
)
{
Expand Down Expand Up @@ -61,7 +61,7 @@ public static partial class Operations
public static async Task<(string rootObjId, IReadOnlyDictionary<string, ObjectReference> convertedReferences)> Send(
Base value,
IReadOnlyCollection<ITransport> transports,
Action<ConcurrentDictionary<string, int>>? onProgressAction = null,
Action<ConcurrentBag<ProgressArgs>>? onProgressAction = null,
CancellationToken cancellationToken = default
)
{
Expand All @@ -75,11 +75,8 @@ public static partial class Operations
throw new ArgumentException("Expected at least on transport to be specified", nameof(transports));
}

var transportContext = transports.ToDictionary(t => t.TransportName, t => t.TransportContext);

// make sure all logs in the operation have the proper context
using var activity = SpeckleActivityFactory.Start();
activity?.SetTag("transportContext", transportContext);
activity?.SetTag("correlationId", Guid.NewGuid().ToString());
{
var sendTimer = Stopwatch.StartNew();
Expand Down Expand Up @@ -108,8 +105,7 @@ public static partial class Operations
);
activity?.SetTag("serializerElapsed", serializerV2.Elapsed);
SpeckleLog.Logger.Information(
"Finished sending {objectCount} objects after {elapsed}, result {objectId}",
transports.Max(t => t.SavedObjectCount),
"Finished sending objects after {elapsed}, result {objectId}",
sendTimer.Elapsed.TotalSeconds,
rootObjectId
);
Expand Down Expand Up @@ -140,7 +136,7 @@ public static partial class Operations
}
}

/// <returns><inheritdoc cref="Send(Base, IReadOnlyCollection{ITransport}, Action{ConcurrentDictionary{string, int}}?, CancellationToken)"/></returns>
/// <returns><inheritdoc cref="Send(Base, IReadOnlyCollection{ITransport}, Action{ConcurrentBag{ProgressArgs}}?, CancellationToken)"/></returns>
internal static async Task<string> SerializerSend(
Base value,
BaseObjectSerializerV2 serializer,
Expand Down
5 changes: 3 additions & 2 deletions src/Speckle.Sdk/Api/Operations/Operations.Serialize.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Speckle.Sdk.Logging;
using Speckle.Sdk.Models;
using Speckle.Sdk.Serialisation;
using Speckle.Sdk.Transports;

namespace Speckle.Sdk.Api;

Expand All @@ -13,7 +14,7 @@ public static partial class Operations
/// <remarks>
/// If you want to save and persist an object to Speckle Transport or Server,
/// please use any of the "Send" methods.
/// <see cref="Send(Base,Speckle.Sdk.Transports.ITransport,bool,System.Action{System.Collections.Concurrent.ConcurrentDictionary{string,int}}?,System.Threading.CancellationToken)"/>
/// <see cref="Send(Base,Speckle.Sdk.Transports.ITransport,bool,System.Action{System.Collections.Concurrent.ConcurrentBag{ProgressArgs}}?,System.Threading.CancellationToken)"/>
/// </remarks>
/// <param name="value">The object to serialise</param>
/// <param name="cancellationToken"></param>
Expand All @@ -27,7 +28,7 @@ public static string Serialize(Base value, CancellationToken cancellationToken =
/// <remarks>
/// Note: if you want to pull an object from a Speckle Transport or Server,
/// please use
/// <see cref="Receive(string,Speckle.Sdk.Transports.ITransport?,Speckle.Sdk.Transports.ITransport?,System.Action{System.Collections.Concurrent.ConcurrentDictionary{string,int}}?,System.Action{int}?,System.Threading.CancellationToken)"/>
/// <see cref="Receive(string,Speckle.Sdk.Transports.ITransport?,Speckle.Sdk.Transports.ITransport?,System.Action{System.Collections.Concurrent.ConcurrentBag{ProgressArgs}}?,System.Action{int}?,System.Threading.CancellationToken)"/>
/// </remarks>
/// <param name="value">The json string representation of a speckle object that you want to deserialize</param>
/// <param name="cancellationToken"></param>
Expand Down
15 changes: 5 additions & 10 deletions src/Speckle.Sdk/Api/Operations/Operations.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using Speckle.Sdk.Transports;

namespace Speckle.Sdk.Api;

Expand All @@ -14,23 +15,17 @@ public static partial class Operations
/// </summary>
/// <param name="onProgressAction"></param>
/// <returns></returns>
private static Action<string, int>? GetInternalProgressAction(
Action<ConcurrentDictionary<string, int>>? onProgressAction
)
private static Action<ProgressArgs>? GetInternalProgressAction(Action<ConcurrentBag<ProgressArgs>>? onProgressAction)
{
if (onProgressAction is null)
{
return null;
}

var localProgressDict = new ConcurrentDictionary<string, int>();

return (name, processed) =>
return (args) =>
{
if (!localProgressDict.TryAdd(name, processed))
{
localProgressDict[name] += processed;
}
var localProgressDict = new ConcurrentBag<ProgressArgs>();
localProgressDict.Add(args);
onProgressAction.Invoke(localProgressDict);
};
Expand Down
12 changes: 6 additions & 6 deletions src/Speckle.Sdk/Credentials/AccountManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static class AccountManager
/// <returns></returns>
public static async Task<ServerInfo> GetServerInfo(Uri server, CancellationToken cancellationToken = default)
{
using var httpClient = Http.GetHttpProxyClient();
using var httpClient = Http.GetHttpProxyClient(null, null);

using var gqlClient = new GraphQLHttpClient(
new GraphQLHttpClientOptions
Expand Down Expand Up @@ -102,7 +102,7 @@ public static async Task<UserInfo> GetUserInfo(
CancellationToken cancellationToken = default
)
{
using var httpClient = Http.GetHttpProxyClient();
using var httpClient = Http.GetHttpProxyClient(null, null);
Http.AddAuthHeader(httpClient, token);

using var gqlClient = new GraphQLHttpClient(
Expand Down Expand Up @@ -149,7 +149,7 @@ internal static async Task<ActiveUserServerInfoResponse> GetUserServerInfo(
{
try
{
using var httpClient = Http.GetHttpProxyClient();
using var httpClient = Http.GetHttpProxyClient(null, null);
Http.AddAuthHeader(httpClient, token);

using var client = new GraphQLHttpClient(
Expand Down Expand Up @@ -751,7 +751,7 @@ private static async Task<TokenExchangeResponse> GetToken(string accessCode, str
{
try
{
using var client = Http.GetHttpProxyClient();
using var client = Http.GetHttpProxyClient(null, null);

var body = new
{
Expand Down Expand Up @@ -779,7 +779,7 @@ private static async Task<TokenExchangeResponse> GetRefreshedToken(string refres
{
try
{
using var client = Http.GetHttpProxyClient();
using var client = Http.GetHttpProxyClient(null, null);

var body = new
{
Expand Down Expand Up @@ -811,7 +811,7 @@ private static async Task<TokenExchangeResponse> GetRefreshedToken(string refres
/// <exception cref="HttpRequestException">Request to <paramref name="server"/> failed to send or response was not successful</exception>
private static async Task<bool> IsFrontend2Server(Uri server)
{
using var httpClient = Http.GetHttpProxyClient();
using var httpClient = Http.GetHttpProxyClient(null, null);

var response = await Http.HttpPing(server).ConfigureAwait(false);

Expand Down
Loading

0 comments on commit dc4da49

Please sign in to comment.