From cfacef58e35201528f275d4e40e80ce9f9bc8737 Mon Sep 17 00:00:00 2001 From: JacopoWolf Date: Wed, 13 May 2020 23:21:20 +0200 Subject: [PATCH] Implemented generic async wrapper - completed async logic core - closes #41 - closes #42 --- StackInjector/Core/AsyncStackWrapperCore.cs | 72 ++++++++++++++++++ .../AsyncStackWrapperCore.logic.cs} | 54 ++++++-------- StackInjector/Core/Cloning/ClonedCore.cs | 5 ++ StackInjector/Core/Cloning/IClonedCore.cs | 12 +++ StackInjector/Core/IAsyncStackWrapperCore.cs | 22 +++++- StackInjector/Core/StackWrapperCore.cs | 4 +- StackInjector/Core/WrapperCore.logic.cs | 16 ++-- StackInjector/Injector.cs | 41 ++++++++++- .../Settings/StackWrapperSettings.cs | 2 +- StackInjector/Wrappers/AsyncStackWrapper.cs | 73 +++---------------- .../Wrappers/Generic/AsyncStackWrapper.cs | 39 ++++++++++ .../Wrappers/Generic/IAsyncStackWrapper.cs | 21 ++---- StackInjector/Wrappers/IAsyncStackWrapper.cs | 18 +---- StackInjector/Wrappers/StackWrapper.cs | 2 +- .../Services/PowElaborators.cs | 4 + .../Services/TestGenerator.cs | 4 +- tests/StackInjector.TEST.Async/TestProgram.cs | 39 +++++++++- 17 files changed, 279 insertions(+), 149 deletions(-) create mode 100644 StackInjector/Core/AsyncStackWrapperCore.cs rename StackInjector/{Wrappers/AsyncStackWrapper.logic.cs => Core/AsyncStackWrapperCore.logic.cs} (57%) create mode 100644 StackInjector/Wrappers/Generic/AsyncStackWrapper.cs diff --git a/StackInjector/Core/AsyncStackWrapperCore.cs b/StackInjector/Core/AsyncStackWrapperCore.cs new file mode 100644 index 0000000..77ecae6 --- /dev/null +++ b/StackInjector/Core/AsyncStackWrapperCore.cs @@ -0,0 +1,72 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using StackInjector.Wrappers; + +namespace StackInjector.Core +{ + internal abstract partial class AsyncStackWrapperCore : AsyncStackWrapperCore, IAsyncStackWrapperCore + { + + // used to cancel everything + protected internal readonly CancellationTokenSource cancelPendingTasksSource = new CancellationTokenSource(); + + // exposes the token + public CancellationToken PendingTasksCancellationToken + => this.cancelPendingTasksSource.Token; + + // used to lock access to tasks + protected internal readonly object listAccessLock = new object(); + + // asyncronously waited for new events if TaskList is empty + protected internal readonly SemaphoreSlim emptyListAwaiter = new SemaphoreSlim(0); + + // pending tasks + protected internal LinkedList> tasks = new LinkedList>(); + + + internal AsyncStackWrapperCore ( WrapperCore core, Type toRegister ) : base(core, toRegister) + { + // register an event that in case the list is empty, release the empty event listener. + this.cancelPendingTasksSource.Token.Register(this.ReleaseListAwaiter); + } + + + + #region IDisposable Support + + private bool disposedValue = false; + + public override void Dispose () + { + if( !this.disposedValue ) + { + + // managed resources + this.cancelPendingTasksSource.Cancel(); + this.ReleaseListAwaiter(); // in case it's waiting on the empty list + + this.cancelPendingTasksSource.Dispose(); + this.emptyListAwaiter.Dispose(); + + + // big objects + this.tasks.Clear(); + this.tasks = null; + + // clean instantiated objects + this.Core.RemoveInstancesDiff(); + + + this.disposedValue = true; + } + } + + #endregion + + } +} diff --git a/StackInjector/Wrappers/AsyncStackWrapper.logic.cs b/StackInjector/Core/AsyncStackWrapperCore.logic.cs similarity index 57% rename from StackInjector/Wrappers/AsyncStackWrapper.logic.cs rename to StackInjector/Core/AsyncStackWrapperCore.logic.cs index 7471ebb..3fb3932 100644 --- a/StackInjector/Wrappers/AsyncStackWrapper.logic.cs +++ b/StackInjector/Core/AsyncStackWrapperCore.logic.cs @@ -1,22 +1,23 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Linq; +using System.Text; using System.Threading.Tasks; -namespace StackInjector.Wrappers +namespace StackInjector.Core { - - internal partial class AsyncStackWrapper + internal abstract partial class AsyncStackWrapperCore { - - /// - public void Submit ( object submitted ) + // call the semaphore + protected internal void ReleaseListAwaiter () { - var task = this.GetAsyncEntryPoint().Digest(submitted,this.cancelPendingTasksSource.Token); - + this.emptyListAwaiter.Release(); + } + public void Submit ( Task work ) + { lock( this.listAccessLock ) - this.tasks.AddLast(task); - + this.tasks.AddLast( work ); // if the list was empty just an item ago, signal it's not anymore. // this limit avoids useless cross thread calls that would slow everything down. @@ -24,12 +25,15 @@ public void Submit ( object submitted ) this.ReleaseListAwaiter(); } + public bool AnyTaskLeft () + { + lock( this.listAccessLock ) + return this.tasks.Any(); + } - - /// - public async IAsyncEnumerable Elaborated () + public async IAsyncEnumerable Elaborated () { - while( !this.cancelPendingTasksSource.IsCancellationRequested ) + while( ! this.cancelPendingTasksSource.IsCancellationRequested ) { // avoid deadlocks if( this.tasks.Any() ) @@ -39,7 +43,7 @@ public async IAsyncEnumerable Elaborated () lock( this.listAccessLock ) this.tasks.Remove(completed); - yield return (T)completed.Result; + yield return completed.Result; continue; } else @@ -51,21 +55,5 @@ public async IAsyncEnumerable Elaborated () } } - /// - public bool AnyTaskLeft () - { - lock( this.listAccessLock ) - return this.tasks.Any(); - } - - - /// - /// gets the entry point for this stack - /// - /// - internal IAsyncStackEntryPoint GetAsyncEntryPoint () - => - this.Core.GetEntryPoint(); - } -} +} \ No newline at end of file diff --git a/StackInjector/Core/Cloning/ClonedCore.cs b/StackInjector/Core/Cloning/ClonedCore.cs index 84dc950..b80148e 100644 --- a/StackInjector/Core/Cloning/ClonedCore.cs +++ b/StackInjector/Core/Cloning/ClonedCore.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Text; using StackInjector.Wrappers; +using StackInjector.Wrappers.Generic; namespace StackInjector.Core.Cloning { @@ -25,6 +26,10 @@ public IAsyncStackWrapper ToAsyncWrapper () where T : IAsyncStackEntryPoint return wrapper; } + //todo implement + public IAsyncStackWrapper ToGenericAsync ( AsyncStackDigest digest ) + => throw new NotImplementedException(); + public IStackWrapper ToWrapper () where T : IStackEntryPoint { diff --git a/StackInjector/Core/Cloning/IClonedCore.cs b/StackInjector/Core/Cloning/IClonedCore.cs index 1029dde..b0f2d46 100644 --- a/StackInjector/Core/Cloning/IClonedCore.cs +++ b/StackInjector/Core/Cloning/IClonedCore.cs @@ -1,4 +1,5 @@ using StackInjector.Wrappers; +using StackInjector.Wrappers.Generic; namespace StackInjector.Core.Cloning { @@ -23,6 +24,17 @@ public interface IClonedCore /// the new wrapper IAsyncStackWrapper ToAsyncWrapper () where T : IAsyncStackEntryPoint; + + /// + /// convert this to an + /// + /// entry instantiation poin of the new wrapper + /// type of input elements + /// type of output elements + /// action to perform on elements + /// the new wrapper + IAsyncStackWrapper ToGenericAsync ( AsyncStackDigest digest ); + } } diff --git a/StackInjector/Core/IAsyncStackWrapperCore.cs b/StackInjector/Core/IAsyncStackWrapperCore.cs index 0a92912..72cbb32 100644 --- a/StackInjector/Core/IAsyncStackWrapperCore.cs +++ b/StackInjector/Core/IAsyncStackWrapperCore.cs @@ -2,21 +2,37 @@ using System.Collections.Generic; using System.Text; using System.Threading; +using System.Threading.Tasks; namespace StackInjector.Core { /// - /// base interface for all asyncronous stackwrappers + /// base interface for all asyncronous stackwrappers. /// - public interface IAsyncStackWrapperCore : IStackWrapperCore + /// the type tasks will return + public interface IAsyncStackWrapperCore : IStackWrapperCore { /// /// Used to signal cancellation of every pending task /// - public CancellationToken PendingTasksCancellationToken { get; } + CancellationToken PendingTasksCancellationToken { get; } + /// + /// submit new work to this wrapper + /// + /// + void Submit ( Task work ); + + /// + /// The loop you ca use to await foreach tasks in elaboration, converted to the specified type. + /// When the pending tasks list is empty, unless is explocitly called + /// this will wait indefinitively. + /// + /// An asyncronous enumerable of completed tasks + IAsyncEnumerable Elaborated (); + /// /// check if there are tasks left to elaborate /// diff --git a/StackInjector/Core/StackWrapperCore.cs b/StackInjector/Core/StackWrapperCore.cs index 5e460fa..0580330 100644 --- a/StackInjector/Core/StackWrapperCore.cs +++ b/StackInjector/Core/StackWrapperCore.cs @@ -4,7 +4,7 @@ namespace StackInjector.Core { - internal abstract class StackWrapperCore : IStackWrapperCore + internal abstract class AsyncStackWrapperCore : IStackWrapperCore { public ref readonly StackWrapperSettings Settings => ref this.Core.settings; @@ -13,7 +13,7 @@ public ref readonly StackWrapperSettings Settings private protected readonly WrapperCore Core; - public StackWrapperCore ( WrapperCore core, Type toRegister ) + public AsyncStackWrapperCore ( WrapperCore core, Type toRegister ) { this.Core = core; diff --git a/StackInjector/Core/WrapperCore.logic.cs b/StackInjector/Core/WrapperCore.logic.cs index 1a7f0b5..6c24e8a 100644 --- a/StackInjector/Core/WrapperCore.logic.cs +++ b/StackInjector/Core/WrapperCore.logic.cs @@ -8,13 +8,11 @@ internal partial class WrapperCore internal void ServeAll () { - - ////// setting for referencing this object from instances inside - ////if( this.settings.registerSelf ) - //// this.instances.AddInstance(this.GetType(), this); - var toInject = new Queue(); + // saves time in later elaboration + this.entryPoint = this.ClassOrFromInterface(this.entryPoint); + // instantiates and enqueues the EntryPoint toInject.Enqueue ( @@ -29,6 +27,9 @@ internal void ServeAll () foreach( var service in usedServices ) toInject.Enqueue(service); } + + + } @@ -42,10 +43,7 @@ internal T GetEntryPoint () return (T)this .instances - .OfType - ( - this.ClassOrFromInterface(this.entryPoint) - ) + .OfType( this.entryPoint ) .First(); } } diff --git a/StackInjector/Injector.cs b/StackInjector/Injector.cs index 8d28585..a5fce70 100644 --- a/StackInjector/Injector.cs +++ b/StackInjector/Injector.cs @@ -3,10 +3,10 @@ using StackInjector.Exceptions; using StackInjector.Settings; using StackInjector.Wrappers; +using StackInjector.Wrappers.Generic; namespace StackInjector { - //todo edit documentation to allow clone cloning /// /// Static factory class exposing methods to create new StackWrappers /// Note that using any of the exposed methods will analyze the whole target assembly, @@ -64,8 +64,7 @@ public static IAsyncStackWrapper AsyncFrom ( StackWrapperSettings settings = // create a new async stack wrapper var core = new WrapperCore( settings ) { - entryPoint = typeof(T), - instances = new SingleInstanceHolder() + entryPoint = typeof(T) }; var wrapper = new AsyncStackWrapper(core); @@ -76,6 +75,42 @@ public static IAsyncStackWrapper AsyncFrom ( StackWrapperSettings settings = return wrapper; } + /// + /// Create a new generic asyncronous StackWrapper from the + /// entry class with the specified delegate to apply to apply as digest + /// + /// class from which start injection + /// type of elements in input + /// type of elements in output + /// delegate used to call the relative method to perform on submitted items + /// the settings to use with this object. If null, use default. + /// + public static IAsyncStackWrapper AsyncFrom + ( + AsyncStackDigest digest, + StackWrapperSettings settings = null + ) + { + if( settings == null ) + settings = StackWrapperSettings.Default; + + // create a new generic async wrapper + var core = new WrapperCore( settings ) + { + entryPoint = typeof(TEntry) + }; + + var wrapper = new AsyncStackWrapper(core) + { + StackDigest = digest + }; + + core.ReadAssemblies(); + core.ServeAll(); + + return wrapper; + } + } } diff --git a/StackInjector/Settings/StackWrapperSettings.cs b/StackInjector/Settings/StackWrapperSettings.cs index 1ce9d17..8839677 100644 --- a/StackInjector/Settings/StackWrapperSettings.cs +++ b/StackInjector/Settings/StackWrapperSettings.cs @@ -58,7 +58,7 @@ public static StackWrapperSettings Default .VersioningMethod(ServedVersionTargetingMethod.None, @override: false); - //todo maybe add a DefaultInner for nested wrappers + //? maybe add a DefaultInner for nested wrappers #endregion diff --git a/StackInjector/Wrappers/AsyncStackWrapper.cs b/StackInjector/Wrappers/AsyncStackWrapper.cs index 666bd4b..2bddc48 100644 --- a/StackInjector/Wrappers/AsyncStackWrapper.cs +++ b/StackInjector/Wrappers/AsyncStackWrapper.cs @@ -8,44 +8,26 @@ namespace StackInjector.Wrappers { [Service(DoNotServeMembers = true, Version = 2.0)] - internal partial class AsyncStackWrapper : StackWrapperCore, IAsyncStackWrapper + internal partial class AsyncStackWrapper : AsyncStackWrapperCore, IAsyncStackWrapper { - // used to cancel everything - private readonly CancellationTokenSource cancelPendingTasksSource = new CancellationTokenSource(); - - // exposes the token - public CancellationToken PendingTasksCancellationToken { get => this.cancelPendingTasksSource.Token; } - - // used to lock access to tasks - private readonly object listAccessLock = new object(); - - // asyncronously waited for new events if TaskList is empty - private readonly SemaphoreSlim emptyListAwaiter = new SemaphoreSlim(0); - - // pending tasks - private LinkedList> tasks = new LinkedList>(); - - /// /// create a new AsyncStackWrapper /// internal AsyncStackWrapper ( WrapperCore core ) : base(core, typeof(AsyncStackWrapper)) - { + { } - // register an event that in case the list is empty, release the empty event listener. - this.cancelPendingTasksSource.Token.Register(this.ReleaseListAwaiter); - } - - - /// - /// call the semaphore - /// - private void ReleaseListAwaiter () - => - this.emptyListAwaiter.Release(); + public void Submit ( object item ) + { + var task = + this + .Core + .GetEntryPoint() + .Digest(item, this.PendingTasksCancellationToken); + base.Submit(task); + } public override string ToString () @@ -53,38 +35,5 @@ public override string ToString () $"AsyncStackWrapper{{ {this.Core.instances.GetAllTypes().Count()} registered types; " + $"entry point: {this.Core.entryPoint.Name}; canceled: {this.cancelPendingTasksSource.IsCancellationRequested} }}"; - - - #region IDisposable Support - - private bool disposedValue = false; - - public override void Dispose () - { - if( !this.disposedValue ) - { - - // managed resources - this.cancelPendingTasksSource.Cancel(); - this.ReleaseListAwaiter(); // in case it's waiting on the empty list - - this.cancelPendingTasksSource.Dispose(); - this.emptyListAwaiter.Dispose(); - - - // big objects - this.tasks.Clear(); - this.tasks = null; - - // clean instantiated objects - this.Core.RemoveInstancesDiff(); - - - this.disposedValue = true; - } - } - - #endregion - } } \ No newline at end of file diff --git a/StackInjector/Wrappers/Generic/AsyncStackWrapper.cs b/StackInjector/Wrappers/Generic/AsyncStackWrapper.cs new file mode 100644 index 0000000..11e33ad --- /dev/null +++ b/StackInjector/Wrappers/Generic/AsyncStackWrapper.cs @@ -0,0 +1,39 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using StackInjector.Attributes; +using StackInjector.Core; + +namespace StackInjector.Wrappers.Generic +{ + [Service(Version = 2.1, DoNotServeMembers = true)] + internal class AsyncStackWrapper : AsyncStackWrapperCore, IAsyncStackWrapper + { + + public AsyncStackDigest StackDigest { get; internal set; } + + public AsyncStackWrapper ( WrapperCore core ) : base(core, typeof(AsyncStackWrapper)) + { } + + public void Submit ( TIn item ) + { + base.Submit + ( + this.StackDigest.Invoke + ( + this.Core.GetEntryPoint(), + item, + this.PendingTasksCancellationToken + ) + ); + } + + public override string ToString () + => + $"AsyncStackWrapper<{typeof(TEntry).Name},{typeof(TIn).Name},{typeof(TOut).Name}>" + + $"{{ {this.Core.instances.GetAllTypes().Count()} registered types; " + + $"canceled: {this.cancelPendingTasksSource.IsCancellationRequested} }}"; + + } +} diff --git a/StackInjector/Wrappers/Generic/IAsyncStackWrapper.cs b/StackInjector/Wrappers/Generic/IAsyncStackWrapper.cs index 9719ed7..ac1f857 100644 --- a/StackInjector/Wrappers/Generic/IAsyncStackWrapper.cs +++ b/StackInjector/Wrappers/Generic/IAsyncStackWrapper.cs @@ -11,13 +11,16 @@ namespace StackInjector.Wrappers.Generic /// rappresents a strongly typed generic entry point /// of an /// + /// the entry type of this object /// type on the submitted item /// type of the return item + /// the instance of the entry /// the item to elaborate /// the cancellation token used to cancel the task /// a task rappresenting the current job - public delegate Task AsyncStackDigest + public delegate Task AsyncStackDigest ( + TEntry instance, TIn item, CancellationToken cancellationToken ); @@ -26,16 +29,15 @@ CancellationToken cancellationToken /// /// Wraps a Stack of dependency-injected classes, and manages an of completed tasks. /// - /// the entry point type, from which to inject services from + /// /// type submitted to the digest function /// return type of the Digest function - public interface IAsyncStackWrapper : IAsyncStackWrapperCore + public interface IAsyncStackWrapper : IAsyncStackWrapperCore { /// /// Called to elaborate submitted items. - /// It's supposed to call a method of /// - AsyncStackDigest StackDigest { get; } + AsyncStackDigest StackDigest { get; } /// /// submit a new item to be elaborated @@ -43,14 +45,5 @@ public interface IAsyncStackWrapper : IAsyncStackWrapperCo /// the item to submit void Submit ( TIn item ); - /// - /// The loop you ca use to await foreach tasks in elaboration, converted to the specified type. - /// When the pending tasks list is empty, unless is explocitly called - /// this will wait indefinitively. - /// - /// An asyncronous enumerable of completed tasks - IAsyncEnumerable Elaborated (); - - } } diff --git a/StackInjector/Wrappers/IAsyncStackWrapper.cs b/StackInjector/Wrappers/IAsyncStackWrapper.cs index 17a594b..c6373ef 100644 --- a/StackInjector/Wrappers/IAsyncStackWrapper.cs +++ b/StackInjector/Wrappers/IAsyncStackWrapper.cs @@ -8,30 +8,14 @@ namespace StackInjector.Wrappers /// /// Wraps a Stack of dependency-injected classes, and manages an of completed tasks. /// - public interface IAsyncStackWrapper : IAsyncStackWrapperCore + public interface IAsyncStackWrapper : IAsyncStackWrapperCore { - - - /// /// Submit a new object to be elaborated asyncronously in this stack /// /// The object to elaborate void Submit ( object submitted ); - - /// - /// The loop you ca use to await foreach tasks in elaboration, converted to the specified type. - /// When the pending tasks list is empty, unless is explocitly called - /// this will wait indefinitively. - /// - /// Type to cast the object returned by the entry point - /// - /// An asyncronous enumerable of completed tasks - IAsyncEnumerable Elaborated (); - - - } } \ No newline at end of file diff --git a/StackInjector/Wrappers/StackWrapper.cs b/StackInjector/Wrappers/StackWrapper.cs index 18e80b0..e5cb148 100644 --- a/StackInjector/Wrappers/StackWrapper.cs +++ b/StackInjector/Wrappers/StackWrapper.cs @@ -5,7 +5,7 @@ namespace StackInjector.Wrappers { [Service(Version = 1.0, DoNotServeMembers = true)] - internal class StackWrapper : StackWrapperCore, IStackWrapper + internal class StackWrapper : AsyncStackWrapperCore, IStackWrapper { diff --git a/tests/StackInjector.TEST.Async/Services/PowElaborators.cs b/tests/StackInjector.TEST.Async/Services/PowElaborators.cs index 480c562..be2977f 100644 --- a/tests/StackInjector.TEST.Async/Services/PowElaborators.cs +++ b/tests/StackInjector.TEST.Async/Services/PowElaborators.cs @@ -4,6 +4,7 @@ using System.Threading; using System.Threading.Tasks; using StackInjector.Attributes; +using StackInjector.Core; using StackInjector.Wrappers; namespace StackInjector.TEST.Async.Services @@ -19,6 +20,9 @@ class MathService class PowElaborator : IAsyncStackEntryPoint { + [Served] + IStackWrapperCore wrapper; + [Served] MathService MathService { get; set; } diff --git a/tests/StackInjector.TEST.Async/Services/TestGenerator.cs b/tests/StackInjector.TEST.Async/Services/TestGenerator.cs index c1b8c47..4ac1671 100644 --- a/tests/StackInjector.TEST.Async/Services/TestGenerator.cs +++ b/tests/StackInjector.TEST.Async/Services/TestGenerator.cs @@ -47,9 +47,9 @@ async Task> waitForRes () List results = new List(); int counter = 2; - await foreach( var res in elaborationWrapper.Elaborated() ) + await foreach( var res in elaborationWrapper.Elaborated() ) if( counter++ < 18 ) // we don't want to wait forever. - results.Add(res); + results.Add( (double)res ); else break; diff --git a/tests/StackInjector.TEST.Async/TestProgram.cs b/tests/StackInjector.TEST.Async/TestProgram.cs index 471be46..cb1809b 100644 --- a/tests/StackInjector.TEST.Async/TestProgram.cs +++ b/tests/StackInjector.TEST.Async/TestProgram.cs @@ -11,7 +11,7 @@ namespace StackInjector.TEST.Async { public class Tests { - [Test][Retry(5)] + [Test] public async Task TestAsync () { var feed = Enumerable.Range(1, 11); @@ -34,7 +34,7 @@ public async Task TestAsync () ); var counter = 0; - await foreach( var result in asyncwrapper.Elaborated() ) + await foreach( var result in asyncwrapper.Elaborated() ) if( counter++ < 10 ) Console.Write($"{result}; "); else @@ -45,6 +45,41 @@ public async Task TestAsync () } + // this test is 5-10 milliseconds FASTER than the generic counterpart. + // I guess type safety really fastens it up a lot + [Test] + public async Task TestGenericAsync() + { + var feed = Enumerable.Range(1, 11); + + using var wrapper = Injector.AsyncFrom( async (i,e,t) => (double) await i.Digest(e,t) ); + + // takes up to 1 second, waiting for 0.1 seconds every submission + var feeder = Task.Run + ( + () => + { + foreach( var item in feed ) + { + wrapper.Submit(item); + + Thread.Sleep(10); // same as above + } + } + ); + + var counter = 0; + await foreach( var result in wrapper.Elaborated() ) + if( counter++ < 10 ) + Console.Write($"{result}; "); + else + break; + + Assert.AreEqual(feed.Count(), counter); + + } + + [Test] public void ServeAsync () {