Skip to content

Commit

Permalink
Fix deserialization progress reporting (#137)
Browse files Browse the repository at this point in the history
* Fix progress reporting for deserialization, remove children known count

* fix tests
  • Loading branch information
adamhathcock authored Oct 9, 2024
1 parent 0d7805d commit e14913b
Show file tree
Hide file tree
Showing 12 changed files with 36 additions and 136 deletions.
41 changes: 7 additions & 34 deletions src/Speckle.Sdk/Api/Operations/Operations.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using Speckle.Sdk.Logging;
using Speckle.Sdk.Models;
using Speckle.Sdk.Serialisation;
using Speckle.Sdk.Serialisation.Utilities;
using Speckle.Sdk.Transports;

namespace Speckle.Sdk.Api;
Expand All @@ -24,7 +23,6 @@ public partial class Operations
/// <param name="remoteTransport">The remote transport (slower). If <see langword="null"/>, will assume all objects are present in <paramref name="localTransport"/></param>
/// <param name="localTransport">The local transport (faster). If <see langword="null"/>, will use a default <see cref="SQLiteTransport"/> cache</param>
/// <param name="onProgressAction">Action invoked on progress iterations</param>
/// <param name="onTotalChildrenCountKnown">Action invoked once the total count of objects is known</param>
/// <param name="cancellationToken"></param>
/// <exception cref="TransportException">Failed to retrieve objects from the provided transport(s)</exception>
/// <exception cref="SpeckleDeserializeException">Deserialization of the requested object(s) failed</exception>
Expand All @@ -35,7 +33,6 @@ public async Task<Base> Receive(
ITransport? remoteTransport = null,
ITransport? localTransport = null,
IProgress<ProgressArgs>? onProgressAction = null,
Action<int>? onTotalChildrenCountKnown = null,
CancellationToken cancellationToken = default
)
{
Expand All @@ -52,14 +49,7 @@ public async Task<Base> Receive(
using IDisposable? d1 = UseDefaultTransportIfNull(localTransport, out localTransport);
receiveActivity?.SetTags("localTransportContext", localTransport.TransportContext);

var result = await ReceiveImpl(
objectId,
remoteTransport,
localTransport,
onProgressAction,
onTotalChildrenCountKnown,
cancellationToken
)
var result = await ReceiveImpl(objectId, remoteTransport, localTransport, onProgressAction, cancellationToken)
.ConfigureAwait(false);

receiveActivity?.SetStatus(SdkActivityStatusCode.Ok);
Expand All @@ -73,13 +63,12 @@ public async Task<Base> Receive(
}
}

/// <inheritdoc cref="Receive(string,ITransport?,ITransport?,IProgress{ProgressArgs}?,Action{int}?,CancellationToken)"/>
/// <inheritdoc cref="Receive(string,ITransport?,ITransport?,IProgress{ProgressArgs}?,CancellationToken)"/>
private async Task<Base> ReceiveImpl(
string objectId,
ITransport? remoteTransport,
ITransport localTransport,
IProgress<ProgressArgs>? internalProgressAction,
Action<int>? onTotalChildrenCountKnown,
CancellationToken cancellationToken
)
{
Expand All @@ -105,7 +94,7 @@ CancellationToken cancellationToken
};

// Try Local Receive
string? objString = await LocalReceive(objectId, localTransport, onTotalChildrenCountKnown).ConfigureAwait(false);
string? objString = await LocalReceive(objectId, localTransport).ConfigureAwait(false);

if (objString is null)
{
Expand All @@ -123,8 +112,7 @@ CancellationToken cancellationToken
remoteTransport.TransportName
);

objString = await RemoteReceive(objectId, remoteTransport, localTransport, onTotalChildrenCountKnown)
.ConfigureAwait(false);
objString = await RemoteReceive(objectId, remoteTransport, localTransport).ConfigureAwait(false);
}

using var serializerActivity = activityFactory.Start();
Expand All @@ -139,26 +127,15 @@ CancellationToken cancellationToken
/// </summary>
/// <param name="objectId"></param>
/// <param name="localTransport"></param>
/// <param name="onTotalChildrenCountKnown"></param>
/// <returns></returns>
/// <exception cref="SpeckleDeserializeException"></exception>
internal static async Task<string?> LocalReceive(
string objectId,
ITransport localTransport,
Action<int>? onTotalChildrenCountKnown
)
internal static async Task<string?> LocalReceive(string objectId, ITransport localTransport)
{
string? objString = await localTransport.GetObject(objectId).ConfigureAwait(false);
if (objString is null)
{
return null;
}

// Shoot out the total children count, wasteful
var count = ClosureParser.GetClosures(objString).Count;

onTotalChildrenCountKnown?.Invoke(count);

return objString;
}

Expand All @@ -169,19 +146,15 @@ CancellationToken cancellationToken
/// <param name="objectId"></param>
/// <param name="remoteTransport"></param>
/// <param name="localTransport"></param>
/// <param name="onTotalChildrenCountKnown"></param>
/// <returns></returns>
/// <exception cref="TransportException">Remote transport was not specified</exception>
private static async Task<string> RemoteReceive(
string objectId,
ITransport remoteTransport,
ITransport localTransport,
Action<int>? onTotalChildrenCountKnown
ITransport localTransport
)
{
var objString = await remoteTransport
.CopyObjectAndChildren(objectId, localTransport, onTotalChildrenCountKnown)
.ConfigureAwait(false);
var objString = await remoteTransport.CopyObjectAndChildren(objectId, localTransport).ConfigureAwait(false);

// DON'T THINK THIS IS NEEDED CopyObjectAndChildren should call this
// Wait for the local transport to finish "writing" - in this case, it signifies that the remote transport has done pushing copying objects into it. (TODO: I can see some scenarios where latency can screw things up, and we should rather wait on the remote transport).
Expand Down
2 changes: 1 addition & 1 deletion src/Speckle.Sdk/Api/Operations/Operations.Serialize.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public string Serialize(Base value, CancellationToken cancellationToken = defaul
/// <remarks>
/// Note: if you want to pull an object from a Speckle Transport or Server,
/// please use
/// <see cref="Receive(string,Speckle.Sdk.Transports.ITransport?,Speckle.Sdk.Transports.ITransport?,System.IProgress{ProgressArgs}?,System.Action{int}?,System.Threading.CancellationToken)"/>
/// <see cref="Receive(string,Speckle.Sdk.Transports.ITransport?,Speckle.Sdk.Transports.ITransport?,System.IProgress{ProgressArgs}?,System.Threading.CancellationToken)"/>
/// </remarks>
/// <param name="value">The json string representation of a speckle object that you want to deserialize</param>
/// <param name="cancellationToken"></param>
Expand Down
49 changes: 16 additions & 33 deletions src/Speckle.Sdk/Serialisation/SpeckleObjectDeserializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ public sealed class SpeckleObjectDeserializer
private readonly object?[] _invokeNull = [null];

// id -> Base if already deserialized or id -> ValueTask<object> if was handled by a bg thread
private ConcurrentDictionary<string, object?>? _deserializedObjects;
private readonly ConcurrentDictionary<string, object?> _deserializedObjects = new(StringComparer.Ordinal);
private long _total;

/// <summary>
/// Property that describes the type of the object.
Expand All @@ -33,10 +34,6 @@ public sealed class SpeckleObjectDeserializer

public IProgress<ProgressArgs>? OnProgressAction { get; set; }

private long _currentCount;
private readonly HashSet<string> _ids = new();
private long _processedCount;

public string? BlobStorageFolder { get; set; }

/// <param name="rootObjectJson">The JSON string of the object to be deserialized <see cref="Base"/></param>
Expand Down Expand Up @@ -65,15 +62,12 @@ public async ValueTask<Base> DeserializeAsync([NotNull] string? rootObjectJson)
}

_isBusy = true;
_deserializedObjects = new(StringComparer.Ordinal);
_currentCount = 0;

var result = (Base)await DeserializeJsonAsyncInternal(rootObjectJson).NotNull().ConfigureAwait(false);
return result;
}
finally
{
_deserializedObjects = null;
_isBusy = false;
}
}
Expand All @@ -98,10 +92,7 @@ public async ValueTask<Base> DeserializeAsync([NotNull] string? rootObjectJson)
throw new SpeckleDeserializeException("Failed to deserialize", ex);
}

_processedCount++;
OnProgressAction?.Report(
new ProgressArgs(ProgressEvent.DeserializeObject, _currentCount, _ids.Count, _processedCount)
);
OnProgressAction?.Report(new ProgressArgs(ProgressEvent.DeserializeObject, _deserializedObjects.Count, _total));

return converted;
}
Expand Down Expand Up @@ -142,18 +133,18 @@ public async ValueTask<Base> DeserializeAsync([NotNull] string? rootObjectJson)
{
reader.Read(); //goes to prop value
var closures = ClosureParser.GetClosures(reader);
foreach (var closure in closures)
if (closures.Any())
{
_ids.Add(closure.Item1);
_total = closures.Select(x => x.Item1).Concat(_deserializedObjects.Keys).Distinct().Count();
foreach (var closure in closures)
{
string objId = closure.Item1;
//don't do anything with return value but later check if null
// https://linear.app/speckle/issue/CXPLA-54/when-deserializing-dont-allow-closures-that-arent-downloadable
await TryGetDeserializedAsync(objId).ConfigureAwait(false);
}
}

foreach (var closure in closures)
{
string objId = closure.Item1;
//don't do anything with return value but later check if null
// https://linear.app/speckle/issue/CXPLA-54/when-deserializing-dont-allow-closures-that-arent-downloadable
await TryGetDeserializedAsync(objId).ConfigureAwait(false);
}
reader.Read(); //goes to next
continue;
}
Expand Down Expand Up @@ -203,10 +194,7 @@ public async ValueTask<Base> DeserializeAsync([NotNull] string? rootObjectJson)
throw new SpeckleDeserializeException("Failed to deserialize reference object", ex);
}

if (_deserializedObjects.TryAdd(objId, deserialized))
{
_currentCount++;
}
_deserializedObjects.TryAdd(objId, deserialized);
}
if (deserialized is ValueTask<object> valueTask)
{
Expand All @@ -219,11 +207,9 @@ public async ValueTask<Base> DeserializeAsync([NotNull] string? rootObjectJson)
throw new SpeckleDeserializeException("Failed to deserialize reference object", ex);
}

if (_deserializedObjects.TryAdd(objId, deserialized))
{
_currentCount++;
}
_deserializedObjects.TryAdd(objId, deserialized);
}

if (deserialized != null)
{
return deserialized;
Expand All @@ -238,10 +224,7 @@ public async ValueTask<Base> DeserializeAsync([NotNull] string? rootObjectJson)

deserialized = await DeserializeJsonAsyncInternal(objectJson).ConfigureAwait(false);

if (_deserializedObjects.NotNull().TryAdd(objId, deserialized))
{
_currentCount++;
}
_deserializedObjects.TryAdd(objId, deserialized);

return deserialized;
}
Expand Down
8 changes: 2 additions & 6 deletions src/Speckle.Sdk/Transports/DiskTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,10 @@ public Task WriteComplete()
return Task.CompletedTask;
}

public async Task<string> CopyObjectAndChildren(
string id,
ITransport targetTransport,
Action<int>? onTotalChildrenCountKnown = null
)
public async Task<string> CopyObjectAndChildren(string id, ITransport targetTransport)
{
string res = await TransportHelpers
.CopyObjectAndChildrenAsync(id, this, targetTransport, onTotalChildrenCountKnown, CancellationToken)
.CopyObjectAndChildrenAsync(id, this, targetTransport, CancellationToken)
.ConfigureAwait(false);
return res;
}
Expand Down
7 changes: 1 addition & 6 deletions src/Speckle.Sdk/Transports/ITransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,11 @@ public interface ITransport
/// </summary>
/// <param name="id">The id of the object you want to copy.</param>
/// <param name="targetTransport">The transport you want to copy the object to.</param>
/// <param name="onTotalChildrenCountKnown">(Optional) an <see cref="Action{T}"/> that will be invoked once, when the number of object children to be copied over is known.</param>
/// <returns>The string representation of the root object.</returns>
/// <exception cref="ArgumentException">The provided arguments are not valid</exception>
/// <exception cref="TransportException">The transport could not complete the operation</exception>
/// <exception cref="OperationCanceledException"><see cref="CancellationToken"/> requested cancel</exception>
public Task<string> CopyObjectAndChildren(
string id,
ITransport targetTransport,
Action<int>? onTotalChildrenCountKnown = null
);
public Task<string> CopyObjectAndChildren(string id, ITransport targetTransport);

/// <summary>
/// Checks if objects are present in the transport
Expand Down
8 changes: 2 additions & 6 deletions src/Speckle.Sdk/Transports/MemoryTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,10 @@ public void SaveObject(string id, string serializedObject)
return Task.FromResult(ret);
}

public async Task<string> CopyObjectAndChildren(
string id,
ITransport targetTransport,
Action<int>? onTotalChildrenCountKnown = null
)
public async Task<string> CopyObjectAndChildren(string id, ITransport targetTransport)
{
string res = await TransportHelpers
.CopyObjectAndChildrenAsync(id, this, targetTransport, onTotalChildrenCountKnown, CancellationToken)
.CopyObjectAndChildrenAsync(id, this, targetTransport, CancellationToken)
.ConfigureAwait(false);
return res;
}
Expand Down
8 changes: 2 additions & 6 deletions src/Speckle.Sdk/Transports/SQLiteTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -427,14 +427,10 @@ public void SaveObjectSync(string hash, string serializedObject)
return null; // pass on the duty of null checks to consumers
}

public async Task<string> CopyObjectAndChildren(
string id,
ITransport targetTransport,
Action<int>? onTotalChildrenCountKnown = null
)
public async Task<string> CopyObjectAndChildren(string id, ITransport targetTransport)
{
string res = await TransportHelpers
.CopyObjectAndChildrenAsync(id, this, targetTransport, onTotalChildrenCountKnown, CancellationToken)
.CopyObjectAndChildrenAsync(id, this, targetTransport, CancellationToken)
.ConfigureAwait(false);
return res;
}
Expand Down
8 changes: 1 addition & 7 deletions src/Speckle.Sdk/Transports/ServerTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,7 @@ public void Dispose()
public IProgress<ProgressArgs>? OnProgressAction { get; set; }
public TimeSpan Elapsed { get; private set; } = TimeSpan.Zero;

public async Task<string> CopyObjectAndChildren(
string id,
ITransport targetTransport,
Action<int>? onTotalChildrenCountKnown = null
)
public async Task<string> CopyObjectAndChildren(string id, ITransport targetTransport)
{
if (string.IsNullOrEmpty(id))
{
Expand All @@ -145,8 +141,6 @@ public async Task<string> CopyObjectAndChildren(
var childrenIds = allIds.Where(x => !x.Contains("blob:"));
var blobIds = allIds.Where(x => x.Contains("blob:")).Select(x => x.Remove(0, 5));

onTotalChildrenCountKnown?.Invoke(allIds.Count);

//
// Objects download
//
Expand Down
3 changes: 0 additions & 3 deletions src/Speckle.Sdk/Transports/TransportHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ public static async Task<string> CopyObjectAndChildrenAsync(
string id,
ITransport sourceTransport,
ITransport targetTransport,
Action<int>? onTotalChildrenCountKnown,
CancellationToken cancellationToken
)
{
Expand All @@ -32,8 +31,6 @@ CancellationToken cancellationToken

var closures = ClosureParser.GetChildrenIds(parent).ToList();

onTotalChildrenCountKnown?.Invoke(closures.Count);

int i = 0;
foreach (var closure in closures)
{
Expand Down
7 changes: 2 additions & 5 deletions tests/Speckle.Sdk.Serialization.Tests/TestTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,8 @@ public string TransportName

public Task<string?> GetObject(string id) => Task.FromResult(Objects.TryGetValue(id, out string? o) ? o : null);

public Task<string> CopyObjectAndChildren(
string id,
ITransport targetTransport,
Action<int>? onTotalChildrenCountKnown = null
) => throw new NotImplementedException();
public Task<string> CopyObjectAndChildren(string id, ITransport targetTransport) =>
throw new NotImplementedException();

public Task<Dictionary<string, bool>> HasObjects(IReadOnlyList<string> objectIds) =>
throw new NotImplementedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,8 @@ public Task WriteComplete()

public Task<string> GetObject(string id) => throw new NotImplementedException();

public Task<string> CopyObjectAndChildren(
string id,
ITransport targetTransport,
Action<int> onTotalChildrenCountKnown = null
) => throw new NotImplementedException();
public Task<string> CopyObjectAndChildren(string id, ITransport targetTransport) =>
throw new NotImplementedException();

public Task<Dictionary<string, bool>> HasObjects(IReadOnlyList<string> objectIds) =>
throw new NotImplementedException();
Expand Down
Loading

0 comments on commit e14913b

Please sign in to comment.