diff --git a/src/_common/Observables/IStreamObservable.cs b/src/_common/Observables/IStreamObservable.cs index 1fe143665..bcdc18206 100644 --- a/src/_common/Observables/IStreamObservable.cs +++ b/src/_common/Observables/IStreamObservable.cs @@ -60,6 +60,11 @@ public interface IStreamObservable /// BinarySettings Properties { get; } + /// + /// Gets the maximum size of the Cache list. + /// + int MaxCacheSize { get; } + /// /// Gets the current number of subscribers. /// diff --git a/src/_common/Observables/IStreamObserver.cs b/src/_common/Observables/IStreamObserver.cs index ed5c89a30..4e1a7e568 100644 --- a/src/_common/Observables/IStreamObserver.cs +++ b/src/_common/Observables/IStreamObserver.cs @@ -39,7 +39,15 @@ public interface IStreamObserver /// /// Starting point in timeline to rebuild. /// - void OnChange(DateTime fromTimestamp); + void OnRebuild(DateTime fromTimestamp); + + /// + /// Provides the observer with notification to prune data. + /// + /// + /// Ending point in timeline to prune. + /// + void OnPrune(DateTime toTimestamp); /// /// Provides the observer with errors from the provider diff --git a/src/_common/Observables/StreamHub.Observable.cs b/src/_common/Observables/StreamHub.Observable.cs index efc79e41b..a0534fc8c 100644 --- a/src/_common/Observables/StreamHub.Observable.cs +++ b/src/_common/Observables/StreamHub.Observable.cs @@ -15,6 +15,9 @@ public abstract partial class StreamHub : IStreamObservable /// public IReadOnlyList ReadCache => Cache; + /// + public int MaxCacheSize { get; init; } + /// public virtual BinarySettings Properties { get; init; } = new(0); // default 0b00000000 @@ -94,12 +97,24 @@ private void NotifyObserversOnAdd(TOut item, int? indexHint) /// /// Sends rebuilds point in time to all subscribers. /// - /// Rebuild starting positions. - private void NotifyObserversOnChange(DateTime fromTimestamp) + /// Rebuild starting date. + private void NotifyObserversOnRebuild(DateTime fromTimestamp) + { + foreach (IStreamObserver o in _observers.ToArray()) + { + o.OnRebuild(fromTimestamp); + } + } + + /// + /// Sends prune notification to all subscribers. + /// + /// Prune ending date. + private void NotifyObserversOnPrune(DateTime toTimestamp) { foreach (IStreamObserver o in _observers.ToArray()) { - o.OnChange(fromTimestamp); + o.OnPrune(toTimestamp); } } diff --git a/src/_common/Observables/StreamHub.Observer.cs b/src/_common/Observables/StreamHub.Observer.cs index 522c3a3d8..ccd107218 100644 --- a/src/_common/Observables/StreamHub.Observer.cs +++ b/src/_common/Observables/StreamHub.Observer.cs @@ -35,9 +35,21 @@ public virtual void OnAdd(TIn item, bool notify, int? indexHint) } /// - public void OnChange(DateTime fromTimestamp) + public void OnRebuild(DateTime fromTimestamp) => Rebuild(fromTimestamp); + /// + public void OnPrune(DateTime toTimestamp) + { + while (Cache.Count > 0 && Cache[0].Timestamp <= toTimestamp) + { + Cache.RemoveAt(0); + } + + // notify observers + NotifyObserversOnPrune(toTimestamp); + } + /// public void OnError(Exception exception) => throw exception; diff --git a/src/_common/Observables/StreamHub.cs b/src/_common/Observables/StreamHub.cs index d1437cf01..7e58343e4 100644 --- a/src/_common/Observables/StreamHub.cs +++ b/src/_common/Observables/StreamHub.cs @@ -25,6 +25,9 @@ private protected StreamHub(IStreamObservable provider) // inherit settings (reinstantiate struct on heap) Properties = Properties.Combine(provider.Properties); + + // inherit max cache size + MaxCacheSize = provider.MaxCacheSize; } #endregion @@ -129,7 +132,7 @@ public void Insert(TIn newIn) } Cache.Insert(index, result); - NotifyObserversOnChange(result.Timestamp); + NotifyObserversOnRebuild(result.Timestamp); } // normal add @@ -247,6 +250,9 @@ Provider terminated. return !Properties[1]; } + // maintenance pruning + PruneCache(); + // not repeating OverflowCount = 0; LastItem = item; @@ -264,7 +270,7 @@ Provider terminated. public void Remove(TOut cachedItem) { Cache.Remove(cachedItem); - NotifyObserversOnChange(cachedItem.Timestamp); + NotifyObserversOnRebuild(cachedItem.Timestamp); } /// @@ -276,7 +282,7 @@ public void RemoveAt(int cacheIndex) { TOut cachedItem = Cache[cacheIndex]; Cache.RemoveAt(cacheIndex); - NotifyObserversOnChange(cachedItem.Timestamp); + NotifyObserversOnRebuild(cachedItem.Timestamp); } /// @@ -296,7 +302,7 @@ public void RemoveRange(DateTime fromTimestamp, bool notify) // notify observers if (notify) { - NotifyObserversOnChange(fromTimestamp); + NotifyObserversOnRebuild(fromTimestamp); } } @@ -321,6 +327,27 @@ public void RemoveRange(int fromIndex, bool notify) RemoveRange(fromTimestamp, notify); } + + /// + /// Prunes the cache to the maximum size. + /// + protected void PruneCache() + { + if (Cache.Count < MaxCacheSize) + { + return; + } + + DateTime toTimestamp = DateTime.MinValue; + + while (Cache.Count >= MaxCacheSize) + { + toTimestamp = Cache[0].Timestamp; + Cache.RemoveAt(0); + } + + NotifyObserversOnPrune(toTimestamp); + } #endregion #region REBUILD & REINITIALIZE @@ -372,7 +399,7 @@ public void Rebuild(DateTime fromTimestamp) } // notify observers - NotifyObserversOnChange(fromTimestamp); + NotifyObserversOnRebuild(fromTimestamp); } /// @@ -413,6 +440,7 @@ protected virtual void RollbackState(DateTime timestamp) #region chain and quote variants /// +/// Streaming data provider. public abstract class QuoteProvider( IStreamObservable provider ) : StreamHub(provider), IQuoteProvider diff --git a/src/_common/Quotes/Quote.StreamHub.cs b/src/_common/Quotes/Quote.StreamHub.cs index a3e3c7030..81004d021 100644 --- a/src/_common/Quotes/Quote.StreamHub.cs +++ b/src/_common/Quotes/Quote.StreamHub.cs @@ -25,9 +25,26 @@ public class QuoteHub where TQuote : IQuote { /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the base, without its own provider. /// - public QuoteHub() : base(new EmptyQuoteProvider()) { } + /// Maximum in-memory cache size. + public QuoteHub(int? maxCacheSize = null) + : base(new BaseProvider()) + { + + const int maxCacheSizeDefault = (int)(0.9 * int.MaxValue); + + if (maxCacheSize is not null and > maxCacheSizeDefault) + { + string message + = $"'{nameof(maxCacheSize)}' must be less than {maxCacheSizeDefault}."; + + throw new ArgumentOutOfRangeException( + nameof(maxCacheSize), maxCacheSize, message); + } + + MaxCacheSize = maxCacheSize ?? maxCacheSizeDefault; + } /// /// Initializes a new instance of the class with a specified provider. @@ -58,20 +75,22 @@ public override string ToString() } /// -/// Empty quote provider for base Quote Hub initialization. +/// Inert provider for base Hub initialization. /// -/// Internal use only. Do not use directly. -/// -public class EmptyQuoteProvider - : IQuoteProvider - where TQuote : IQuote +/// +public class BaseProvider + : IStreamObservable + where T : IReusable { /// - /// Default quote provider is parent-less Quote Hub. + /// Inert quote provider is parent-less Hub. /// It does not transfer its setting to its children. /// public BinarySettings Properties { get; } = new(0b00000001, 0b11111110); + /// + public int MaxCacheSize => 0; + /// /// Gets the number of observers. /// @@ -85,27 +104,27 @@ public class EmptyQuoteProvider /// /// Gets the list of quotes. /// - public IReadOnlyList Quotes { get; } = Array.Empty(); + public IReadOnlyList Quotes { get; } = Array.Empty(); /// /// Gets a reference to the cache. /// /// A read-only list of quotes. - public IReadOnlyList GetCacheRef() => Array.Empty(); + public IReadOnlyList GetCacheRef() => Array.Empty(); /// /// Determines whether the specified observer is a subscriber. /// /// The observer to check. /// true if the observer is a subscriber; otherwise, false. - public bool HasSubscriber(IStreamObserver observer) => false; + public bool HasSubscriber(IStreamObserver observer) => false; /// /// Subscribes the specified observer. /// /// The observer to subscribe. /// A disposable object that can be used to unsubscribe. - public IDisposable Subscribe(IStreamObserver observer) + public IDisposable Subscribe(IStreamObserver observer) => throw new InvalidOperationException(); /// @@ -113,7 +132,7 @@ public IDisposable Subscribe(IStreamObserver observer) /// /// The observer to unsubscribe. /// true if the observer was unsubscribed; otherwise, false. - public bool Unsubscribe(IStreamObserver observer) + public bool Unsubscribe(IStreamObserver observer) => throw new InvalidOperationException(); /// diff --git a/tests/indicators/_common/Observables/StreamHub.CacheMgmt.Tests.cs b/tests/indicators/_common/Observables/StreamHub.CacheMgmt.Tests.cs index 188ba1470..73cc1ff19 100644 --- a/tests/indicators/_common/Observables/StreamHub.CacheMgmt.Tests.cs +++ b/tests/indicators/_common/Observables/StreamHub.CacheMgmt.Tests.cs @@ -147,4 +147,58 @@ public void OverflowedAndReset() provider.EndTransmission(); } + + [TestMethod] + public void MaxCacheSize() + { + int maxCacheSize = 30; + + // initialize + QuoteHub provider = new(maxCacheSize); + SmaHub observer = provider.ToSma(20); + + // sets max cache size + provider.MaxCacheSize.Should().Be(maxCacheSize); + + // inherits max cache size + observer.MaxCacheSize.Should().Be(maxCacheSize); + } + + [TestMethod] + public void PrunedCache() + { + int maxCacheSize = 30; + + // initialize + QuoteHub provider = new(maxCacheSize); + SmaHub observer = provider.ToSma(20); + IReadOnlyList seriesList = Quotes.ToSma(20); + + // add quotes + provider.Add(Quotes.Take(maxCacheSize)); + + // assert: cache size is full size + provider.Quotes.Should().HaveCount(maxCacheSize); + observer.Results.Should().HaveCount(maxCacheSize); + + // add more quotes to exceed max cache size + provider.Add(Quotes.Skip(maxCacheSize).Take(10)); + + // assert: cache size is pruned + provider.Results.Should().HaveCount(maxCacheSize); + observer.Results.Should().HaveCount(maxCacheSize); + + // assert: correct values remain + provider.Quotes.Should().BeEquivalentTo( + Quotes.Skip(10).Take(maxCacheSize)); + + observer.Results.Should().BeEquivalentTo( + seriesList.Skip(10).Take(maxCacheSize)); + } + + [TestMethod] + public void PrunedAsymmetric() => + // TODO: asymetric results (e.g. Renko) + // pruned to correct date, instead of count + Assert.Inconclusive("not implemented"); }