diff --git a/Directory.Packages.props b/Directory.Packages.props index 80bd0db1..97872873 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -13,13 +13,13 @@ - + - + @@ -28,9 +28,10 @@ + - + \ No newline at end of file diff --git a/src/Speckle.Objects/packages.lock.json b/src/Speckle.Objects/packages.lock.json index a8bfa0e1..bf3c01fd 100644 --- a/src/Speckle.Objects/packages.lock.json +++ b/src/Speckle.Objects/packages.lock.json @@ -484,7 +484,6 @@ "type": "Project", "dependencies": { "GraphQL.Client": "[6.0.0, )", - "Microsoft.Bcl.AsyncInterfaces": "[5.0.0, )", "Microsoft.CSharp": "[4.7.0, )", "Microsoft.Data.Sqlite": "[7.0.7, )", "Microsoft.Extensions.DependencyInjection.Abstractions": "[2.2.0, )", @@ -508,12 +507,6 @@ "System.Reactive": "5.0.0" } }, - "Microsoft.Bcl.AsyncInterfaces": { - "type": "CentralTransitive", - "requested": "[5.0.0, )", - "resolved": "5.0.0", - "contentHash": "W8DPQjkMScOMTtJbPwmPyj9c3zYSFGawDW3jwlBOOsnY+EzZFLgNQ/UMkK35JmkNOVPdCyPr2Tw7Vv9N+KA3ZQ==" - }, "Microsoft.CSharp": { "type": "CentralTransitive", "requested": "[4.7.0, )", diff --git a/src/Speckle.Sdk.Dependencies/Serialization/ChannelLoader.cs b/src/Speckle.Sdk.Dependencies/Serialization/ChannelLoader.cs new file mode 100644 index 00000000..d7bc96df --- /dev/null +++ b/src/Speckle.Sdk.Dependencies/Serialization/ChannelLoader.cs @@ -0,0 +1,33 @@ +using Open.ChannelExtensions; + +namespace Speckle.Sdk.Serialisation.V2.Receive; + +public abstract class ChannelLoader +{ + private const int HTTP_ID_CHUNK_SIZE = 500; + private const int MAX_PARALLELISM_HTTP = 4; + + protected async Task GetAndCache(IEnumerable allChildrenIds, CancellationToken cancellationToken = default) => + await allChildrenIds + .ToChannel(cancellationToken: cancellationToken) + .Pipe(Environment.ProcessorCount, CheckCache, cancellationToken: cancellationToken) + .Filter(x => x is not null) + .Batch(HTTP_ID_CHUNK_SIZE) + .WithTimeout(TimeSpan.FromSeconds(2)) + .PipeAsync( + MAX_PARALLELISM_HTTP, + async x => await DownloadAndCache(x).ConfigureAwait(false), + -1, + false, + cancellationToken + ) + .Join() + .ReadAllConcurrently(Environment.ProcessorCount, SaveToCache, cancellationToken) + .ConfigureAwait(false); + + public abstract string? CheckCache(string id); + + public abstract Task> DownloadAndCache(List ids); + + public abstract void SaveToCache((string, string) x); +} diff --git a/src/Speckle.Sdk.Dependencies/Speckle.Sdk.Dependencies.csproj b/src/Speckle.Sdk.Dependencies/Speckle.Sdk.Dependencies.csproj index 3b0d9060..9299b933 100644 --- a/src/Speckle.Sdk.Dependencies/Speckle.Sdk.Dependencies.csproj +++ b/src/Speckle.Sdk.Dependencies/Speckle.Sdk.Dependencies.csproj @@ -1,4 +1,4 @@ - + netstandard2.0;net8.0 @@ -27,7 +27,9 @@ - + + + diff --git a/src/Speckle.Sdk.Dependencies/packages.lock.json b/src/Speckle.Sdk.Dependencies/packages.lock.json index 0c481e76..80f29e97 100644 --- a/src/Speckle.Sdk.Dependencies/packages.lock.json +++ b/src/Speckle.Sdk.Dependencies/packages.lock.json @@ -42,6 +42,17 @@ "Microsoft.NETCore.Platforms": "1.1.0" } }, + "Open.ChannelExtensions": { + "type": "Direct", + "requested": "[8.5.0, )", + "resolved": "8.5.0", + "contentHash": "dKD2iNfUYw+aOvwM2vCnD+q6JCtHiabkufKM1GateedRzcgv0RrtA4MoJI+7Y8N21R5A+wUA+j6P88g6mXPavA==", + "dependencies": { + "Microsoft.Bcl.AsyncInterfaces": "8.0.0", + "System.Collections.Immutable": "8.0.0", + "System.Threading.Channels": "8.0.0" + } + }, "Polly": { "type": "Direct", "requested": "[7.2.3, )", @@ -75,6 +86,15 @@ "resolved": "0.9.6", "contentHash": "HKH7tYrYYlCK1ct483hgxERAdVdMtl7gUKW9ijWXxA1UsYR4Z+TrRHYmzZ9qmpu1NnTycSrp005NYM78GDKV1w==" }, + "System.Threading.Channels": { + "type": "Direct", + "requested": "[8.0.0, )", + "resolved": "8.0.0", + "contentHash": "CMaFr7v+57RW7uZfZkPExsPB6ljwzhjACWW1gfU35Y56rk72B/Wu+sTqxVmGSk4SFUlPc3cjeKND0zktziyjBA==", + "dependencies": { + "System.Threading.Tasks.Extensions": "4.5.4" + } + }, "ILRepack": { "type": "Transitive", "resolved": "2.0.33", @@ -94,6 +114,57 @@ "type": "Transitive", "resolved": "8.0.0", "contentHash": "dk9JPxTCIevS75HyEQ0E4OVAFhB2N+V9ShCXf8Q6FkUQZDkgLI12y679Nym1YqsiSysuQskT7Z+6nUf3yab6Vw==" + }, + "System.Buffers": { + "type": "Transitive", + "resolved": "4.5.1", + "contentHash": "Rw7ijyl1qqRS0YQD/WycNst8hUUMgrMH4FCn1nNm27M4VxchZ1js3fVjQaANHO5f3sN4isvP4a+Met9Y4YomAg==" + }, + "System.Collections.Immutable": { + "type": "Transitive", + "resolved": "8.0.0", + "contentHash": "AurL6Y5BA1WotzlEvVaIDpqzpIPvYnnldxru8oXJU2yFxFUy3+pNXjXd1ymO+RA0rq0+590Q8gaz2l3Sr7fmqg==", + "dependencies": { + "System.Memory": "4.5.5", + "System.Runtime.CompilerServices.Unsafe": "6.0.0" + } + }, + "System.Memory": { + "type": "Transitive", + "resolved": "4.5.5", + "contentHash": "XIWiDvKPXaTveaB7HVganDlOCRoj03l+jrwNvcge/t8vhGYKvqV+dMv6G4SAX2NoNmN0wZfVPTAlFwZcZvVOUw==", + "dependencies": { + "System.Buffers": "4.5.1", + "System.Numerics.Vectors": "4.4.0", + "System.Runtime.CompilerServices.Unsafe": "4.5.3" + } + }, + "System.Numerics.Vectors": { + "type": "Transitive", + "resolved": "4.4.0", + "contentHash": "UiLzLW+Lw6HLed1Hcg+8jSRttrbuXv7DANVj0DkL9g6EnnzbL75EB7EWsw5uRbhxd/4YdG8li5XizGWepmG3PQ==" + }, + "System.Runtime.CompilerServices.Unsafe": { + "type": "Transitive", + "resolved": "6.0.0", + "contentHash": "/iUeP3tq1S0XdNNoMz5C9twLSrM/TH+qElHkXWaPvuNOt+99G75NrV0OS2EqHx5wMN7popYjpc8oTjC1y16DLg==" + }, + "System.Threading.Tasks.Extensions": { + "type": "Transitive", + "resolved": "4.5.4", + "contentHash": "zteT+G8xuGu6mS+mzDzYXbzS7rd3K6Fjb9RiZlYlJPam2/hU7JCBZBVEcywNuR+oZ1ncTvc/cq0faRr3P01OVg==", + "dependencies": { + "System.Runtime.CompilerServices.Unsafe": "4.5.3" + } + }, + "Microsoft.Bcl.AsyncInterfaces": { + "type": "CentralTransitive", + "requested": "[5.0.0, )", + "resolved": "8.0.0", + "contentHash": "3WA9q9yVqJp222P3x1wYIGDAkpjAku0TMUaaQV22g6L67AI0LdOIrVS7Ht2vJfLHGSPVuqN94vIr15qn+HEkHw==", + "dependencies": { + "System.Threading.Tasks.Extensions": "4.5.4" + } } }, "net8.0": { @@ -128,6 +199,12 @@ "Microsoft.SourceLink.Common": "8.0.0" } }, + "Open.ChannelExtensions": { + "type": "Direct", + "requested": "[8.5.0, )", + "resolved": "8.5.0", + "contentHash": "dKD2iNfUYw+aOvwM2vCnD+q6JCtHiabkufKM1GateedRzcgv0RrtA4MoJI+7Y8N21R5A+wUA+j6P88g6mXPavA==" + }, "Polly": { "type": "Direct", "requested": "[7.2.3, )", @@ -161,6 +238,12 @@ "resolved": "0.9.6", "contentHash": "HKH7tYrYYlCK1ct483hgxERAdVdMtl7gUKW9ijWXxA1UsYR4Z+TrRHYmzZ9qmpu1NnTycSrp005NYM78GDKV1w==" }, + "System.Threading.Channels": { + "type": "Direct", + "requested": "[8.0.0, )", + "resolved": "8.0.0", + "contentHash": "CMaFr7v+57RW7uZfZkPExsPB6ljwzhjACWW1gfU35Y56rk72B/Wu+sTqxVmGSk4SFUlPc3cjeKND0zktziyjBA==" + }, "ILRepack": { "type": "Transitive", "resolved": "2.0.33", diff --git a/src/Speckle.Sdk/Api/GraphQL/Client.cs b/src/Speckle.Sdk/Api/GraphQL/Client.cs index 37fa525b..2dadff62 100644 --- a/src/Speckle.Sdk/Api/GraphQL/Client.cs +++ b/src/Speckle.Sdk/Api/GraphQL/Client.cs @@ -1,5 +1,4 @@ using System.Diagnostics.CodeAnalysis; -using System.Dynamic; using System.Net.WebSockets; using System.Reflection; using GraphQL; @@ -175,46 +174,11 @@ internal void MaybeThrowFromGraphQLErrors(GraphQLRequest request, GraphQLResp } } - private Dictionary ConvertExpandoToDict(ExpandoObject expando) - { - var variables = new Dictionary(); - foreach (KeyValuePair kvp in expando) - { - object? value; - if (kvp.Value is ExpandoObject ex) - { - value = ConvertExpandoToDict(ex); - } - else - { - value = kvp.Value; - } - - variables[kvp.Key] = value; - } - return variables; - } - - /* private ILogEventEnricher[] CreateEnrichers(GraphQLRequest request) - { - // i know this is double (de)serializing, but we need a recursive convert to - // dict here - var expando = JsonConvert.DeserializeObject(JsonConvert.SerializeObject(request.Variables)); - var variables = request.Variables != null && expando != null ? ConvertExpandoToDict(expando) : null; - return new ILogEventEnricher[] - { - new PropertyEnricher("serverUrl", ServerUrl), - new PropertyEnricher("graphqlQuery", request.Query), - new PropertyEnricher("graphqlVariables", variables), - new PropertyEnricher("resultType", typeof(T).Name) - }; - }*/ - IDisposable ISpeckleGraphQLClient.SubscribeTo(GraphQLRequest request, Action callback) => SubscribeTo(request, callback); /// - internal IDisposable SubscribeTo(GraphQLRequest request, Action callback) + private IDisposable SubscribeTo(GraphQLRequest request, Action callback) { //using (LogContext.Push(CreateEnrichers(request))) { diff --git a/src/Speckle.Sdk/Common/HashCode.cs b/src/Speckle.Sdk/Common/HashCode.cs index 43108c61..be44eed1 100644 --- a/src/Speckle.Sdk/Common/HashCode.cs +++ b/src/Speckle.Sdk/Common/HashCode.cs @@ -63,7 +63,7 @@ public static HashCode OfEach(IEnumerable? items) => /// The type of the item. /// The item. /// The new hash code. - public HashCode And(T item) => new HashCode(CombineHashCodes(this._value, GetHashCode(item))); + public HashCode And(T item) => new HashCode(CombineHashCodes(_value, GetHashCode(item))); /// /// Adds the hash code of the specified items in the collection. @@ -75,21 +75,21 @@ public HashCode AndEach(IEnumerable? items) { if (items == null) { - return new HashCode(this._value); + return new HashCode(_value); } - return new HashCode(GetHashCode(items, this._value)); + return new HashCode(GetHashCode(items, _value)); } /// - public bool Equals(HashCode other) => this._value.Equals(other._value); + public bool Equals(HashCode other) => _value.Equals(other._value); /// public override bool Equals(object? obj) { if (obj is HashCode code) { - return this.Equals(code); + return Equals(code); } return false; diff --git a/src/Speckle.Sdk/Credentials/Account.cs b/src/Speckle.Sdk/Credentials/Account.cs index 98cd30e7..f1a529b8 100644 --- a/src/Speckle.Sdk/Credentials/Account.cs +++ b/src/Speckle.Sdk/Credentials/Account.cs @@ -89,7 +89,7 @@ public override bool Equals(object obj) public override int GetHashCode() { -#if NETSTANDARD2_0 +#if NETSTANDARD2_0 return Speckle.Sdk.Common.HashCode.Of(userInfo.email).And(serverInfo.url); #else return HashCode.Combine(userInfo.email, serverInfo.url); diff --git a/src/Speckle.Sdk/Credentials/AccountManager.cs b/src/Speckle.Sdk/Credentials/AccountManager.cs index c5050706..70d55827 100644 --- a/src/Speckle.Sdk/Credentials/AccountManager.cs +++ b/src/Speckle.Sdk/Credentials/AccountManager.cs @@ -835,12 +835,12 @@ private async Task IsFrontend2Server(Uri server) private static string GenerateChallenge() { -#if NETSTANDARD2_0 +#if NET8_0 + byte[] challengeData = RandomNumberGenerator.GetBytes(32); +#else using RNGCryptoServiceProvider rng = new(); byte[] challengeData = new byte[32]; rng.GetBytes(challengeData); -#else - byte[] challengeData = RandomNumberGenerator.GetBytes(32); #endif //escaped chars like % do not play nice with the server return Regex.Replace(Convert.ToBase64String(challengeData), @"[^\w\.@-]", ""); diff --git a/src/Speckle.Sdk/Helpers/PropNameValidator.cs b/src/Speckle.Sdk/Helpers/PropNameValidator.cs index c97ee9c5..c147e444 100644 --- a/src/Speckle.Sdk/Helpers/PropNameValidator.cs +++ b/src/Speckle.Sdk/Helpers/PropNameValidator.cs @@ -35,9 +35,9 @@ public static bool IsChunkable(string propName, out int chunkSize) [Pure] public static bool IsDetached(string propName) => -#if NET5_0_OR_GREATER - propName.StartsWith('@'); -#else +#if NETSTANDARD2_0 propName.StartsWith("@"); +#else + propName.StartsWith('@'); #endif } diff --git a/src/Speckle.Sdk/Models/GraphTraversal/RuleBuilder.cs b/src/Speckle.Sdk/Models/GraphTraversal/RuleBuilder.cs index e5f175b5..435e35b5 100644 --- a/src/Speckle.Sdk/Models/GraphTraversal/RuleBuilder.cs +++ b/src/Speckle.Sdk/Models/GraphTraversal/RuleBuilder.cs @@ -20,7 +20,7 @@ private TraversalRule() public ITraversalBuilderReturn ContinueTraversing(SelectMembers membersToTraverse) { - this._membersToTraverse = membersToTraverse; + _membersToTraverse = membersToTraverse; return this; } diff --git a/src/Speckle.Sdk/Serialisation/V2/AsyncExtensions.cs b/src/Speckle.Sdk/Serialisation/V2/AsyncExtensions.cs index 849ef5af..f2ae05f8 100644 --- a/src/Speckle.Sdk/Serialisation/V2/AsyncExtensions.cs +++ b/src/Speckle.Sdk/Serialisation/V2/AsyncExtensions.cs @@ -1,6 +1,4 @@ -using System.Diagnostics; - -namespace Speckle.Sdk.Serialisation.V2; +namespace Speckle.Sdk.Serialisation.V2; public static class AsyncExtensions { @@ -13,200 +11,4 @@ public static async ValueTask FirstAsync(this IAsyncEnumerable SelectManyAsync(this IEnumerable> source) - { - // get enumerators from all inner IAsyncEnumerable - var enumerators = source.Select(x => x.GetAsyncEnumerator()).ToList(); - - List, bool)>> runningTasks = new(); - - // start all inner IAsyncEnumerable - foreach (var asyncEnumerator in enumerators) - { - runningTasks.Add(MoveNextWrapped(asyncEnumerator)); - } - - // while there are any running tasks - while (runningTasks.Count != 0) - { - // get next finished task and remove it from list - var finishedTask = await Task.WhenAny(runningTasks).ConfigureAwait(false); - runningTasks.Remove(finishedTask); - - // get result from finished IAsyncEnumerable - var result = await finishedTask.ConfigureAwait(false); - var asyncEnumerator = result.Item1; - var hasItem = result.Item2; - - // if IAsyncEnumerable has item, return it and put it back as running for next item - if (hasItem) - { - yield return asyncEnumerator.Current; - - runningTasks.Add(MoveNextWrapped(asyncEnumerator)); - } - } - - // don't forget to dispose, should be in finally - foreach (var asyncEnumerator in enumerators) - { - await asyncEnumerator.DisposeAsync().ConfigureAwait(false); - } - } - - /// - /// Helper method that returns Task with tuple of IAsyncEnumerable and it's result of MoveNextAsync. - /// - private static async Task<(IAsyncEnumerator, bool)> MoveNextWrapped( - IAsyncEnumerator asyncEnumerator - ) - { - var res = await asyncEnumerator.MoveNextAsync().ConfigureAwait(false); - return (asyncEnumerator, res); - } - - public static IAsyncEnumerable BatchAsync(this IAsyncEnumerable source, int size) => - AsyncEnumerableChunkIterator(source, size); - - private static async IAsyncEnumerable AsyncEnumerableChunkIterator( - IAsyncEnumerable source, - int size - ) - { -#pragma warning disable CA2007 - await using IAsyncEnumerator e = source.GetAsyncEnumerator(); -#pragma warning restore CA2007 - - // Before allocating anything, make sure there's at least one element. - if (await e.MoveNextAsync().ConfigureAwait(false)) - { - // Now that we know we have at least one item, allocate an initial storage array. This is not - // the array we'll yield. It starts out small in order to avoid significantly overallocating - // when the source has many fewer elements than the chunk size. - int arraySize = Math.Min(size, 4); - int i; - do - { - var array = new TSource[arraySize]; - - // Store the first item. - array[0] = e.Current; - i = 1; - - if (size != array.Length) - { - // This is the first chunk. As we fill the array, grow it as needed. - for (; i < size && await e.MoveNextAsync().ConfigureAwait(false); i++) - { - if (i >= array.Length) - { - arraySize = (int)Math.Min((uint)size, 2 * (uint)array.Length); - Array.Resize(ref array, arraySize); - } - - array[i] = e.Current; - } - } - else - { - // For all but the first chunk, the array will already be correctly sized. - // We can just store into it until either it's full or MoveNext returns false. - TSource[] local = array; // avoid bounds checks by using cached local (`array` is lifted to iterator object as a field) - Debug.Assert(local.Length == size); - for (; (uint)i < (uint)local.Length && await e.MoveNextAsync().ConfigureAwait(false); i++) - { - local[i] = e.Current; - } - } - - if (i != array.Length) - { - Array.Resize(ref array, i); - } - - yield return array; - } while (i >= size && await e.MoveNextAsync().ConfigureAwait(false)); - } - } - - public static IEnumerable Batch(this IEnumerable source, int size) - { - if (source is TSource[] array) - { - // Special-case arrays, which have an immutable length. This enables us to not only do an - // empty check and avoid allocating an iterator object when empty, it enables us to have a - // much more efficient (and simpler) implementation for chunking up the array. - return array.Length != 0 ? ArrayChunkIterator(array, size) : []; - } - - return EnumerableChunkIterator(source, size); - } - - private static IEnumerable ArrayChunkIterator(TSource[] source, int size) - { - int index = 0; - while (index < source.Length) - { - TSource[] chunk = new ReadOnlySpan(source, index, Math.Min(size, source.Length - index)).ToArray(); - index += chunk.Length; - yield return chunk; - } - } - - private static IEnumerable EnumerableChunkIterator(IEnumerable source, int size) - { - using IEnumerator e = source.GetEnumerator(); - - // Before allocating anything, make sure there's at least one element. - if (e.MoveNext()) - { - // Now that we know we have at least one item, allocate an initial storage array. This is not - // the array we'll yield. It starts out small in order to avoid significantly overallocating - // when the source has many fewer elements than the chunk size. - int arraySize = Math.Min(size, 4); - int i; - do - { - var array = new TSource[arraySize]; - - // Store the first item. - array[0] = e.Current; - i = 1; - - if (size != array.Length) - { - // This is the first chunk. As we fill the array, grow it as needed. - for (; i < size && e.MoveNext(); i++) - { - if (i >= array.Length) - { - arraySize = (int)Math.Min((uint)size, 2 * (uint)array.Length); - Array.Resize(ref array, arraySize); - } - - array[i] = e.Current; - } - } - else - { - // For all but the first chunk, the array will already be correctly sized. - // We can just store into it until either it's full or MoveNext returns false. - TSource[] local = array; // avoid bounds checks by using cached local (`array` is lifted to iterator object as a field) - Debug.Assert(local.Length == size); - for (; (uint)i < (uint)local.Length && e.MoveNext(); i++) - { - local[i] = e.Current; - } - } - - if (i != array.Length) - { - Array.Resize(ref array, i); - } - - yield return array; - } while (i >= size && e.MoveNext()); - } - } } diff --git a/src/Speckle.Sdk/Serialisation/V2/Receive/DeserializeProcess.cs b/src/Speckle.Sdk/Serialisation/V2/Receive/DeserializeProcess.cs index 997343db..322cc92e 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Receive/DeserializeProcess.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Receive/DeserializeProcess.cs @@ -5,14 +5,16 @@ namespace Speckle.Sdk.Serialisation.V2.Receive; -public record DeserializeOptions(bool? SkipCacheCheck = null); +public record DeserializeOptions(bool SkipCache); public sealed class DeserializeProcess(IProgress? progress, IObjectLoader objectLoader) { private readonly ConcurrentDictionary)> _closures = new(); private long _total; + private DeserializeOptions _options = new(false); public ConcurrentDictionary BaseCache { get; } = new(); + private readonly ConcurrentDictionary _activeTasks = new(); public async Task Deserialize( string rootId, @@ -20,8 +22,9 @@ public async Task Deserialize( DeserializeOptions? options = null ) { + _options = options ?? _options; var (rootJson, childrenIds) = await objectLoader - .GetAndCache(rootId, cancellationToken, options) + .GetAndCache(rootId, _options, cancellationToken) .ConfigureAwait(false); _total = childrenIds.Count; _closures.TryAdd(rootId, (rootJson, childrenIds)); @@ -40,13 +43,17 @@ private async Task Traverse(string id, CancellationToken cancellationToken) var tasks = new List(); foreach (var childId in childIds) { - lock (BaseCache) + if (BaseCache.ContainsKey(childId)) { - if (BaseCache.ContainsKey(childId)) - { - continue; - } + continue; + } + if (_activeTasks.TryGetValue(childId, out var task)) + { + tasks.Add(task); + } + else + { // tmp is necessary because of the way closures close over loop variables var tmpId = childId; Task t = Task @@ -58,6 +65,7 @@ private async Task Traverse(string id, CancellationToken cancellationToken) ) .Unwrap(); tasks.Add(t); + _activeTasks.TryAdd(childId, t); } } @@ -102,6 +110,7 @@ public void DecodeOrEnqueueChildren(string id) BaseCache.TryAdd(id, @base); //remove from JSON cache because we've finally made the Base _closures.TryRemove(id, out _); + _activeTasks.TryRemove(id, out _); } private Base Deserialise(string id, string json) diff --git a/src/Speckle.Sdk/Serialisation/V2/Receive/ObjectLoader.cs b/src/Speckle.Sdk/Serialisation/V2/Receive/ObjectLoader.cs index 16cf90b3..e890f977 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Receive/ObjectLoader.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Receive/ObjectLoader.cs @@ -1,4 +1,3 @@ -using System.Collections.Concurrent; using Speckle.InterfaceGenerator; using Speckle.Sdk.Common; using Speckle.Sdk.Serialisation.Utilities; @@ -12,115 +11,95 @@ public sealed class ObjectLoader( IServerObjectManager serverObjectManager, string streamId, IProgress? progress -) : IObjectLoader +) : ChannelLoader, IObjectLoader { - private const int HTTP_ID_CHUNK_SIZE = 500; - private const int CACHE_CHUNK_SIZE = 3000; - private const int MAX_PARALLELISM_HTTP = 4; - private int? _allChildrenCount; + private long _checkCache; + private long _cached; + private DeserializeOptions _options = new(false); public async Task<(string, IReadOnlyList)> GetAndCache( string rootId, - CancellationToken cancellationToken, - DeserializeOptions? options = null + DeserializeOptions options, + CancellationToken cancellationToken ) { - var rootJson = sqLiteCacheManager.GetObject(rootId); - if (rootJson != null) + _options = options; + string? rootJson; + if (!options.SkipCache) { - //assume everything exists as the root is there. - var allChildren = ClosureParser.GetChildrenIds(rootJson).ToList(); - return (rootJson, allChildren); + rootJson = sqLiteCacheManager.GetObject(rootId); + if (rootJson != null) + { + //assume everything exists as the root is there. + var allChildren = ClosureParser.GetChildrenIds(rootJson).ToList(); + return (rootJson, allChildren); + } } rootJson = await serverObjectManager .DownloadSingleObject(streamId, rootId, progress, cancellationToken) .NotNull() .ConfigureAwait(false); - var allChildrenIds = ClosureParser + List allChildrenIds = ClosureParser .GetClosures(rootJson) .OrderByDescending(x => x.Item2) .Select(x => x.Item1) .Where(x => !x.StartsWith("blob", StringComparison.Ordinal)) .ToList(); _allChildrenCount = allChildrenIds.Count; - if (!(options?.SkipCacheCheck ?? false)) - { - var idsToDownload = CheckCache(allChildrenIds); - await DownloadAndCache(idsToDownload, cancellationToken).ConfigureAwait(false); - } + await GetAndCache(allChildrenIds, cancellationToken).ConfigureAwait(false); + //save the root last to shortcut later sqLiteCacheManager.SaveObjectSync(rootId, rootJson); return (rootJson, allChildrenIds); } - private async IAsyncEnumerable CheckCache(IReadOnlyList childrenIds) + [AutoInterfaceIgnore] + public override string? CheckCache(string id) { - var count = 0L; - progress?.Report(new(ProgressEvent.CacheCheck, count, childrenIds.Count)); - await foreach ( - var (id, result) in childrenIds - .Batch(CACHE_CHUNK_SIZE) - .Select(x => sqLiteCacheManager.HasObjects2(x)) // there needs to be a Task somewhere here - .SelectManyAsync() - ) + _checkCache++; + progress?.Report(new(ProgressEvent.CacheCheck, _checkCache, _allChildrenCount)); + if (!_options.SkipCache && !sqLiteCacheManager.HasObject(id)) { - count++; - progress?.Report(new(ProgressEvent.CacheCheck, count, childrenIds.Count)); - if (!result) - { - yield return id; - } + return id; } + + return null; } - private async Task DownloadAndCache(IAsyncEnumerable ids, CancellationToken cancellationToken) + [AutoInterfaceIgnore] + public override async Task> DownloadAndCache(List ids) { var count = 0L; progress?.Report(new(ProgressEvent.DownloadObject, count, _allChildrenCount)); var toCache = new List<(string, string)>(); - var tasks = new ConcurrentBag(); - using SemaphoreSlim ss = new(MAX_PARALLELISM_HTTP, MAX_PARALLELISM_HTTP); - await foreach (var idBatch in ids.BatchAsync(HTTP_ID_CHUNK_SIZE).WithCancellation(cancellationToken)) + await foreach ( + var (id, json) in serverObjectManager.DownloadObjects( + streamId, + ids.Select(x => x.NotNull()).ToList(), + progress, + default + ) + ) { - await ss.WaitAsync(cancellationToken).ConfigureAwait(false); - try - { - await foreach ( - var (id, json) in serverObjectManager.DownloadObjects(streamId, idBatch, progress, cancellationToken) - ) - { - count++; - progress?.Report(new(ProgressEvent.DownloadObject, count, _allChildrenCount)); - toCache.Add((id, json)); - if (toCache.Count >= CACHE_CHUNK_SIZE) - { - var toSave = toCache; - toCache = new List<(string, string)>(); -#pragma warning disable CA2008 - tasks.Add( - Task.Factory.StartNew(() => sqLiteCacheManager.SaveObjects(toSave, cancellationToken), cancellationToken) - ); -#pragma warning restore CA2008 - } - } - } - finally - { - ss.Release(); - } + count++; + progress?.Report(new(ProgressEvent.DownloadObject, count, _allChildrenCount)); + toCache.Add((id, json)); } - if (toCache.Count > 0) + return toCache; + } + + [AutoInterfaceIgnore] + public override void SaveToCache((string, string) x) + { + if (!_options.SkipCache) { -#pragma warning disable CA2008 - tasks.Add( - Task.Factory.StartNew(() => sqLiteCacheManager.SaveObjects(toCache, cancellationToken), cancellationToken) - ); -#pragma warning restore CA2008 + sqLiteCacheManager.SaveObjectSync(x.Item1, x.Item2); } - await Task.WhenAll(tasks).ConfigureAwait(false); + _cached++; + progress?.Report(new(ProgressEvent.Cached, _cached, _allChildrenCount)); } public string? LoadId(string id) => sqLiteCacheManager.GetObject(id); diff --git a/src/Speckle.Sdk/Serialisation/V2/SQLiteCacheManager.cs b/src/Speckle.Sdk/Serialisation/V2/SQLiteCacheManager.cs index 8bda8973..42309985 100644 --- a/src/Speckle.Sdk/Serialisation/V2/SQLiteCacheManager.cs +++ b/src/Speckle.Sdk/Serialisation/V2/SQLiteCacheManager.cs @@ -73,43 +73,6 @@ content TEXT cmd2.ExecuteNonQuery(); } - public IEnumerable<(string, string)> GetObjects(IEnumerable ids, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - using var c = new SqliteConnection(_connectionString); - c.Open(); - using var command = new SqliteCommand("SELECT content FROM objects WHERE hash = @hash LIMIT 1 ", c); - foreach (var id in ids) - { - command.Parameters.Clear(); - command.Parameters.AddWithValue("@hash", id); - using var reader = command.ExecuteReader(); - if (reader.Read()) - { - yield return (id, reader.GetString(0)); - } - } - } - - public void SaveObjects(IEnumerable<(string, string)> objects, CancellationToken cancellationToken) - { - using var c = new SqliteConnection(_connectionString); - c.Open(); - using var t = c.BeginTransaction(); - const string COMMAND_TEXT = "INSERT OR IGNORE INTO objects(hash, content) VALUES(@hash, @content)"; - - foreach (var (id, content) in objects) - { - using var command = new SqliteCommand(COMMAND_TEXT, c, t); - command.Parameters.AddWithValue("@hash", id); - command.Parameters.AddWithValue("@content", content); - command.ExecuteNonQuery(); - cancellationToken.ThrowIfCancellationRequested(); - } - - t.Commit(); - } - public string? GetObject(string id) { using var c = new SqliteConnection(_connectionString); @@ -124,22 +87,17 @@ public void SaveObjects(IEnumerable<(string, string)> objects, CancellationToken return null; // pass on the duty of null checks to consumers } - public async IAsyncEnumerable<(string, bool)> HasObjects2(IEnumerable objectIds) + public bool HasObject(string objectId) { - await Task.Delay(10).ConfigureAwait(false); using var c = new SqliteConnection(_connectionString); c.Open(); const string COMMAND_TEXT = "SELECT 1 FROM objects WHERE hash = @hash LIMIT 1 "; using var command = new SqliteCommand(COMMAND_TEXT, c); - foreach (string objectId in objectIds) - { - command.Parameters.Clear(); - command.Parameters.AddWithValue("@hash", objectId); + command.Parameters.AddWithValue("@hash", objectId); - using var reader = command.ExecuteReader(); - bool rowFound = reader.Read(); - yield return (objectId, rowFound); - } + using var reader = command.ExecuteReader(); + bool rowFound = reader.Read(); + return rowFound; } public void SaveObjectSync(string hash, string serializedObject) diff --git a/src/Speckle.Sdk/Speckle.Sdk.csproj b/src/Speckle.Sdk/Speckle.Sdk.csproj index 663bca5d..602f180d 100644 --- a/src/Speckle.Sdk/Speckle.Sdk.csproj +++ b/src/Speckle.Sdk/Speckle.Sdk.csproj @@ -34,13 +34,13 @@ - + - + - + diff --git a/src/Speckle.Sdk/Transports/ProgressArgs.cs b/src/Speckle.Sdk/Transports/ProgressArgs.cs index 4948cbca..f39a49bf 100644 --- a/src/Speckle.Sdk/Transports/ProgressArgs.cs +++ b/src/Speckle.Sdk/Transports/ProgressArgs.cs @@ -5,6 +5,7 @@ namespace Speckle.Sdk.Transports; public enum ProgressEvent { CacheCheck, + Cached, DownloadBytes, UploadBytes, DownloadObject, diff --git a/src/Speckle.Sdk/Transports/ServerUtils/ServerAPI.cs b/src/Speckle.Sdk/Transports/ServerUtils/ServerAPI.cs index 78b268fc..0632e9fd 100644 --- a/src/Speckle.Sdk/Transports/ServerUtils/ServerAPI.cs +++ b/src/Speckle.Sdk/Transports/ServerUtils/ServerAPI.cs @@ -71,7 +71,7 @@ public void Dispose() Method = HttpMethod.Get, }; - HttpResponseMessage rootHttpResponse = await _client + var rootHttpResponse = await _client .SendAsync(rootHttpMessage, HttpCompletionOption.ResponseContentRead, CancellationToken) .ConfigureAwait(false); @@ -248,7 +248,7 @@ public async Task UploadBlobs( try { - HttpResponseMessage response = await _client.SendAsync(message, CancellationToken).ConfigureAwait(false); + var response = await _client.SendAsync(message, CancellationToken).ConfigureAwait(false); response.EnsureSuccessStatusCode(); @@ -369,7 +369,7 @@ private async Task> HasObjectsImpl(string streamId, IRe var uri = new Uri($"/api/diff/{streamId}", UriKind.Relative); using StringContent stringContent = new(serializedPayload, Encoding.UTF8, "application/json"); - HttpResponseMessage response = await _client.PostAsync(uri, stringContent, CancellationToken).ConfigureAwait(false); + var response = await _client.PostAsync(uri, stringContent, CancellationToken).ConfigureAwait(false); response.EnsureSuccessStatusCode(); @@ -427,7 +427,7 @@ private async Task UploadObjectsImpl( } } message.Content = new ProgressContent(multipart, progress); - HttpResponseMessage response = await _client.SendAsync(message, CancellationToken).ConfigureAwait(false); + var response = await _client.SendAsync(message, CancellationToken).ConfigureAwait(false); response.EnsureSuccessStatusCode(); } @@ -441,7 +441,7 @@ public async Task> HasBlobs(string streamId, IReadOnlyList using StringContent stringContent = new(payload, Encoding.UTF8, "application/json"); - HttpResponseMessage response = await _client.PostAsync(uri, stringContent, CancellationToken).ConfigureAwait(false); + var response = await _client.PostAsync(uri, stringContent, CancellationToken).ConfigureAwait(false); response.EnsureSuccessStatusCode(); diff --git a/src/Speckle.Sdk/packages.lock.json b/src/Speckle.Sdk/packages.lock.json index e8f71a28..b1642921 100644 --- a/src/Speckle.Sdk/packages.lock.json +++ b/src/Speckle.Sdk/packages.lock.json @@ -318,12 +318,6 @@ "System.Reactive": "5.0.0" } }, - "Microsoft.Bcl.AsyncInterfaces": { - "type": "Direct", - "requested": "[5.0.0, )", - "resolved": "5.0.0", - "contentHash": "W8DPQjkMScOMTtJbPwmPyj9c3zYSFGawDW3jwlBOOsnY+EzZFLgNQ/UMkK35JmkNOVPdCyPr2Tw7Vv9N+KA3ZQ==" - }, "Microsoft.CSharp": { "type": "Direct", "requested": "[4.7.0, )", diff --git a/tests/Speckle.Objects.Tests.Unit/packages.lock.json b/tests/Speckle.Objects.Tests.Unit/packages.lock.json index d0322f30..845c4b28 100644 --- a/tests/Speckle.Objects.Tests.Unit/packages.lock.json +++ b/tests/Speckle.Objects.Tests.Unit/packages.lock.json @@ -274,7 +274,6 @@ "type": "Project", "dependencies": { "GraphQL.Client": "[6.0.0, )", - "Microsoft.Bcl.AsyncInterfaces": "[5.0.0, )", "Microsoft.CSharp": "[4.7.0, )", "Microsoft.Data.Sqlite": "[7.0.7, )", "Microsoft.Extensions.DependencyInjection.Abstractions": "[2.2.0, )", @@ -298,12 +297,6 @@ "System.Reactive": "5.0.0" } }, - "Microsoft.Bcl.AsyncInterfaces": { - "type": "CentralTransitive", - "requested": "[5.0.0, )", - "resolved": "5.0.0", - "contentHash": "W8DPQjkMScOMTtJbPwmPyj9c3zYSFGawDW3jwlBOOsnY+EzZFLgNQ/UMkK35JmkNOVPdCyPr2Tw7Vv9N+KA3ZQ==" - }, "Microsoft.CSharp": { "type": "CentralTransitive", "requested": "[4.7.0, )", diff --git a/tests/Speckle.Sdk.Serialization.Testing/packages.lock.json b/tests/Speckle.Sdk.Serialization.Testing/packages.lock.json index 760984b4..2f55ca45 100644 --- a/tests/Speckle.Sdk.Serialization.Testing/packages.lock.json +++ b/tests/Speckle.Sdk.Serialization.Testing/packages.lock.json @@ -338,7 +338,6 @@ "type": "Project", "dependencies": { "GraphQL.Client": "[6.0.0, )", - "Microsoft.Bcl.AsyncInterfaces": "[5.0.0, )", "Microsoft.CSharp": "[4.7.0, )", "Microsoft.Data.Sqlite": "[7.0.7, )", "Microsoft.Extensions.DependencyInjection.Abstractions": "[2.2.0, )", @@ -391,8 +390,8 @@ "Microsoft.Bcl.AsyncInterfaces": { "type": "CentralTransitive", "requested": "[5.0.0, )", - "resolved": "5.0.0", - "contentHash": "W8DPQjkMScOMTtJbPwmPyj9c3zYSFGawDW3jwlBOOsnY+EzZFLgNQ/UMkK35JmkNOVPdCyPr2Tw7Vv9N+KA3ZQ==" + "resolved": "1.1.0", + "contentHash": "1Am6l4Vpn3/K32daEqZI+FFr96OlZkgwK2LcT3pZ2zWubR5zTPW3/FkO1Rat9kb7oQOa4rxgl9LJHc5tspCWfg==" }, "Microsoft.CSharp": { "type": "CentralTransitive", diff --git a/tests/Speckle.Sdk.Serialization.Tests/SerializationTests.cs b/tests/Speckle.Sdk.Serialization.Tests/SerializationTests.cs index ee3f3379..1fb9cfab 100644 --- a/tests/Speckle.Sdk.Serialization.Tests/SerializationTests.cs +++ b/tests/Speckle.Sdk.Serialization.Tests/SerializationTests.cs @@ -20,8 +20,8 @@ private class TestLoader(string json) : IObjectLoader { public Task<(string, IReadOnlyList)> GetAndCache( string rootId, - CancellationToken cancellationToken, - DeserializeOptions? options = null + DeserializeOptions? options, + CancellationToken cancellationToken ) { var childrenIds = ClosureParser.GetChildrenIds(json).ToList(); @@ -78,8 +78,8 @@ public class TestObjectLoader(Dictionary idToObject) : IObjectLo { public Task<(string, IReadOnlyList)> GetAndCache( string rootId, - CancellationToken cancellationToken, - DeserializeOptions? options = default + DeserializeOptions? options, + CancellationToken cancellationToken ) { var json = idToObject.GetValueOrDefault(rootId); diff --git a/tests/Speckle.Sdk.Serialization.Tests/packages.lock.json b/tests/Speckle.Sdk.Serialization.Tests/packages.lock.json index d0322f30..845c4b28 100644 --- a/tests/Speckle.Sdk.Serialization.Tests/packages.lock.json +++ b/tests/Speckle.Sdk.Serialization.Tests/packages.lock.json @@ -274,7 +274,6 @@ "type": "Project", "dependencies": { "GraphQL.Client": "[6.0.0, )", - "Microsoft.Bcl.AsyncInterfaces": "[5.0.0, )", "Microsoft.CSharp": "[4.7.0, )", "Microsoft.Data.Sqlite": "[7.0.7, )", "Microsoft.Extensions.DependencyInjection.Abstractions": "[2.2.0, )", @@ -298,12 +297,6 @@ "System.Reactive": "5.0.0" } }, - "Microsoft.Bcl.AsyncInterfaces": { - "type": "CentralTransitive", - "requested": "[5.0.0, )", - "resolved": "5.0.0", - "contentHash": "W8DPQjkMScOMTtJbPwmPyj9c3zYSFGawDW3jwlBOOsnY+EzZFLgNQ/UMkK35JmkNOVPdCyPr2Tw7Vv9N+KA3ZQ==" - }, "Microsoft.CSharp": { "type": "CentralTransitive", "requested": "[4.7.0, )", diff --git a/tests/Speckle.Sdk.Tests.Integration/packages.lock.json b/tests/Speckle.Sdk.Tests.Integration/packages.lock.json index 5e67102e..359ab637 100644 --- a/tests/Speckle.Sdk.Tests.Integration/packages.lock.json +++ b/tests/Speckle.Sdk.Tests.Integration/packages.lock.json @@ -268,7 +268,6 @@ "type": "Project", "dependencies": { "GraphQL.Client": "[6.0.0, )", - "Microsoft.Bcl.AsyncInterfaces": "[5.0.0, )", "Microsoft.CSharp": "[4.7.0, )", "Microsoft.Data.Sqlite": "[7.0.7, )", "Microsoft.Extensions.DependencyInjection.Abstractions": "[2.2.0, )", @@ -305,12 +304,6 @@ "System.Reactive": "5.0.0" } }, - "Microsoft.Bcl.AsyncInterfaces": { - "type": "CentralTransitive", - "requested": "[5.0.0, )", - "resolved": "5.0.0", - "contentHash": "W8DPQjkMScOMTtJbPwmPyj9c3zYSFGawDW3jwlBOOsnY+EzZFLgNQ/UMkK35JmkNOVPdCyPr2Tw7Vv9N+KA3ZQ==" - }, "Microsoft.CSharp": { "type": "CentralTransitive", "requested": "[4.7.0, )", diff --git a/tests/Speckle.Sdk.Tests.Performance/packages.lock.json b/tests/Speckle.Sdk.Tests.Performance/packages.lock.json index c3b5098b..23ee2f29 100644 --- a/tests/Speckle.Sdk.Tests.Performance/packages.lock.json +++ b/tests/Speckle.Sdk.Tests.Performance/packages.lock.json @@ -356,7 +356,6 @@ "type": "Project", "dependencies": { "GraphQL.Client": "[6.0.0, )", - "Microsoft.Bcl.AsyncInterfaces": "[5.0.0, )", "Microsoft.CSharp": "[4.7.0, )", "Microsoft.Data.Sqlite": "[7.0.7, )", "Microsoft.Extensions.DependencyInjection.Abstractions": "[2.2.0, )", @@ -383,8 +382,8 @@ "Microsoft.Bcl.AsyncInterfaces": { "type": "CentralTransitive", "requested": "[5.0.0, )", - "resolved": "5.0.0", - "contentHash": "W8DPQjkMScOMTtJbPwmPyj9c3zYSFGawDW3jwlBOOsnY+EzZFLgNQ/UMkK35JmkNOVPdCyPr2Tw7Vv9N+KA3ZQ==" + "resolved": "1.1.0", + "contentHash": "1Am6l4Vpn3/K32daEqZI+FFr96OlZkgwK2LcT3pZ2zWubR5zTPW3/FkO1Rat9kb7oQOa4rxgl9LJHc5tspCWfg==" }, "Microsoft.CSharp": { "type": "CentralTransitive", diff --git a/tests/Speckle.Sdk.Tests.Unit/packages.lock.json b/tests/Speckle.Sdk.Tests.Unit/packages.lock.json index 08bb4fbe..b4ab1e60 100644 --- a/tests/Speckle.Sdk.Tests.Unit/packages.lock.json +++ b/tests/Speckle.Sdk.Tests.Unit/packages.lock.json @@ -283,7 +283,6 @@ "type": "Project", "dependencies": { "GraphQL.Client": "[6.0.0, )", - "Microsoft.Bcl.AsyncInterfaces": "[5.0.0, )", "Microsoft.CSharp": "[4.7.0, )", "Microsoft.Data.Sqlite": "[7.0.7, )", "Microsoft.Extensions.DependencyInjection.Abstractions": "[2.2.0, )", @@ -307,12 +306,6 @@ "System.Reactive": "5.0.0" } }, - "Microsoft.Bcl.AsyncInterfaces": { - "type": "CentralTransitive", - "requested": "[5.0.0, )", - "resolved": "5.0.0", - "contentHash": "W8DPQjkMScOMTtJbPwmPyj9c3zYSFGawDW3jwlBOOsnY+EzZFLgNQ/UMkK35JmkNOVPdCyPr2Tw7Vv9N+KA3ZQ==" - }, "Microsoft.CSharp": { "type": "CentralTransitive", "requested": "[4.7.0, )",