Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Max cache size for StreamHub #1290

Merged
merged 4 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/_common/Observables/IStreamObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public interface IStreamObservable<out T>
/// </remarks>
BinarySettings Properties { get; }

/// <summary>
/// Gets the maximum size of the Cache list.
/// </summary>
int MaxCacheSize { get; }

/// <summary>
/// Gets the current number of subscribers.
/// </summary>
Expand Down
10 changes: 9 additions & 1 deletion src/_common/Observables/IStreamObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,15 @@ public interface IStreamObserver<in T>
/// <param name="fromTimestamp">
/// Starting point in timeline to rebuild.
/// </param>
void OnChange(DateTime fromTimestamp);
void OnRebuild(DateTime fromTimestamp);

/// <summary>
/// Provides the observer with notification to prune data.
/// </summary>
/// <param name="toTimestamp">
/// Ending point in timeline to prune.
/// </param>
void OnPrune(DateTime toTimestamp);

/// <summary>
/// Provides the observer with errors from the provider
Expand Down
21 changes: 18 additions & 3 deletions src/_common/Observables/StreamHub.Observable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ public abstract partial class StreamHub<TIn, TOut> : IStreamObservable<TOut>
/// <inheritdoc/>
public IReadOnlyList<TOut> ReadCache => Cache;

/// <inheritdoc/>
public int MaxCacheSize { get; init; }

/// <inheritdoc/>
public virtual BinarySettings Properties { get; init; } = new(0); // default 0b00000000

Expand Down Expand Up @@ -94,12 +97,24 @@ private void NotifyObserversOnAdd(TOut item, int? indexHint)
/// <summary>
/// Sends rebuilds point in time to all subscribers.
/// </summary>
/// <param name="fromTimestamp">Rebuild starting positions.</param>
private void NotifyObserversOnChange(DateTime fromTimestamp)
/// <param name="fromTimestamp">Rebuild starting date.</param>
private void NotifyObserversOnRebuild(DateTime fromTimestamp)
{
foreach (IStreamObserver<TOut> o in _observers.ToArray())
{
o.OnRebuild(fromTimestamp);
}
}

/// <summary>
/// Sends prune notification to all subscribers.
/// </summary>
/// <param name="toTimestamp">Prune ending date.</param>
private void NotifyObserversOnPrune(DateTime toTimestamp)
{
foreach (IStreamObserver<TOut> o in _observers.ToArray())
{
o.OnChange(fromTimestamp);
o.OnPrune(toTimestamp);
}
}

Expand Down
14 changes: 13 additions & 1 deletion src/_common/Observables/StreamHub.Observer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,21 @@ public virtual void OnAdd(TIn item, bool notify, int? indexHint)
}

/// <inheritdoc/>
public void OnChange(DateTime fromTimestamp)
public void OnRebuild(DateTime fromTimestamp)
=> Rebuild(fromTimestamp);

/// <inheritdoc/>
public void OnPrune(DateTime toTimestamp)
{
while (Cache.Count > 0 && Cache[0].Timestamp <= toTimestamp)
{
Cache.RemoveAt(0);
}

// notify observers
NotifyObserversOnPrune(toTimestamp);
}

/// <inheritdoc/>
public void OnError(Exception exception)
=> throw exception;
Expand Down
38 changes: 33 additions & 5 deletions src/_common/Observables/StreamHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ private protected StreamHub(IStreamObservable<TIn> provider)

// inherit settings (reinstantiate struct on heap)
Properties = Properties.Combine(provider.Properties);

// inherit max cache size
MaxCacheSize = provider.MaxCacheSize;
}

#endregion
Expand Down Expand Up @@ -129,7 +132,7 @@ public void Insert(TIn newIn)
}

Cache.Insert(index, result);
NotifyObserversOnChange(result.Timestamp);
NotifyObserversOnRebuild(result.Timestamp);
}

// normal add
Expand Down Expand Up @@ -247,6 +250,9 @@ Provider terminated.
return !Properties[1];
}

// maintenance pruning
PruneCache();

// not repeating
OverflowCount = 0;
LastItem = item;
Expand All @@ -264,7 +270,7 @@ Provider terminated.
public void Remove(TOut cachedItem)
{
Cache.Remove(cachedItem);
NotifyObserversOnChange(cachedItem.Timestamp);
NotifyObserversOnRebuild(cachedItem.Timestamp);
}

/// <summary>
Expand All @@ -276,7 +282,7 @@ public void RemoveAt(int cacheIndex)
{
TOut cachedItem = Cache[cacheIndex];
Cache.RemoveAt(cacheIndex);
NotifyObserversOnChange(cachedItem.Timestamp);
NotifyObserversOnRebuild(cachedItem.Timestamp);
}

/// <summary>
Expand All @@ -296,7 +302,7 @@ public void RemoveRange(DateTime fromTimestamp, bool notify)
// notify observers
if (notify)
{
NotifyObserversOnChange(fromTimestamp);
NotifyObserversOnRebuild(fromTimestamp);
}
}

Expand All @@ -321,6 +327,27 @@ public void RemoveRange(int fromIndex, bool notify)

RemoveRange(fromTimestamp, notify);
}

/// <summary>
/// Prunes the cache to the maximum size.
/// </summary>
protected void PruneCache()
{
if (Cache.Count < MaxCacheSize)
{
return;
}

DateTime toTimestamp = DateTime.MinValue;

while (Cache.Count >= MaxCacheSize)
{
toTimestamp = Cache[0].Timestamp;
Cache.RemoveAt(0);
}

NotifyObserversOnPrune(toTimestamp);
}
#endregion

#region REBUILD & REINITIALIZE
Expand Down Expand Up @@ -372,7 +399,7 @@ public void Rebuild(DateTime fromTimestamp)
}

// notify observers
NotifyObserversOnChange(fromTimestamp);
NotifyObserversOnRebuild(fromTimestamp);
}

/// <summary>
Expand Down Expand Up @@ -413,6 +440,7 @@ protected virtual void RollbackState(DateTime timestamp)
#region chain and quote variants

/// <inheritdoc cref="IStreamHub{TIn, TOut}"/>
/// <param name="provider">Streaming data provider.</param>
public abstract class QuoteProvider<TIn, TOut>(
IStreamObservable<TIn> provider
) : StreamHub<TIn, TOut>(provider), IQuoteProvider<TOut>
Expand Down
47 changes: 33 additions & 14 deletions src/_common/Quotes/Quote.StreamHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,26 @@ public class QuoteHub<TQuote>
where TQuote : IQuote
{
/// <summary>
/// Initializes a new instance of the <see cref="QuoteHub{TQuote}"/> class.
/// Initializes a new instance of the <see cref="QuoteHub{TQuote}"/> base, without its own provider.
/// </summary>
public QuoteHub() : base(new EmptyQuoteProvider<TQuote>()) { }
/// <param name="maxCacheSize">Maximum in-memory cache size.</param>
public QuoteHub(int? maxCacheSize = null)
: base(new BaseProvider<TQuote>())
{

const int maxCacheSizeDefault = (int)(0.9 * int.MaxValue);

if (maxCacheSize is not null and > maxCacheSizeDefault)
{
string message
= $"'{nameof(maxCacheSize)}' must be less than {maxCacheSizeDefault}.";

throw new ArgumentOutOfRangeException(
nameof(maxCacheSize), maxCacheSize, message);
}

MaxCacheSize = maxCacheSize ?? maxCacheSizeDefault;
}

/// <summary>
/// Initializes a new instance of the <see cref="QuoteHub{TQuote}"/> class with a specified provider.
Expand Down Expand Up @@ -58,20 +75,22 @@ public override string ToString()
}

/// <summary>
/// Empty quote provider for base Quote Hub initialization.
/// Inert provider for base Hub initialization.
/// </summary>
/// <remarks>Internal use only. Do not use directly.</remarks>
/// <typeparam name="TQuote"></typeparam>
public class EmptyQuoteProvider<TQuote>
: IQuoteProvider<TQuote>
where TQuote : IQuote
/// <typeparam name="T"></typeparam>
public class BaseProvider<T>
: IStreamObservable<T>
where T : IReusable
{
/// <summary>
/// Default quote provider is parent-less Quote Hub.
/// Inert quote provider is parent-less Hub.
/// It does not transfer its setting to its children.
/// </summary>
public BinarySettings Properties { get; } = new(0b00000001, 0b11111110);

/// <inheritdoc/>
public int MaxCacheSize => 0;

/// <summary>
/// Gets the number of observers.
/// </summary>
Expand All @@ -85,35 +104,35 @@ public class EmptyQuoteProvider<TQuote>
/// <summary>
/// Gets the list of quotes.
/// </summary>
public IReadOnlyList<TQuote> Quotes { get; } = Array.Empty<TQuote>();
public IReadOnlyList<T> Quotes { get; } = Array.Empty<T>();

/// <summary>
/// Gets a reference to the cache.
/// </summary>
/// <returns>A read-only list of quotes.</returns>
public IReadOnlyList<TQuote> GetCacheRef() => Array.Empty<TQuote>();
public IReadOnlyList<T> GetCacheRef() => Array.Empty<T>();

/// <summary>
/// Determines whether the specified observer is a subscriber.
/// </summary>
/// <param name="observer">The observer to check.</param>
/// <returns><c>true</c> if the observer is a subscriber; otherwise, <c>false</c>.</returns>
public bool HasSubscriber(IStreamObserver<TQuote> observer) => false;
public bool HasSubscriber(IStreamObserver<T> observer) => false;

/// <summary>
/// Subscribes the specified observer.
/// </summary>
/// <param name="observer">The observer to subscribe.</param>
/// <returns>A disposable object that can be used to unsubscribe.</returns>
public IDisposable Subscribe(IStreamObserver<TQuote> observer)
public IDisposable Subscribe(IStreamObserver<T> observer)
=> throw new InvalidOperationException();

/// <summary>
/// Unsubscribes the specified observer.
/// </summary>
/// <param name="observer">The observer to unsubscribe.</param>
/// <returns><c>true</c> if the observer was unsubscribed; otherwise, <c>false</c>.</returns>
public bool Unsubscribe(IStreamObserver<TQuote> observer)
public bool Unsubscribe(IStreamObserver<T> observer)
=> throw new InvalidOperationException();

/// <summary>
Expand Down
54 changes: 54 additions & 0 deletions tests/indicators/_common/Observables/StreamHub.CacheMgmt.Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,58 @@ public void OverflowedAndReset()

provider.EndTransmission();
}

[TestMethod]
public void MaxCacheSize()
{
int maxCacheSize = 30;

// initialize
QuoteHub<Quote> provider = new(maxCacheSize);
SmaHub<Quote> observer = provider.ToSma(20);

// sets max cache size
provider.MaxCacheSize.Should().Be(maxCacheSize);

// inherits max cache size
observer.MaxCacheSize.Should().Be(maxCacheSize);
}

[TestMethod]
public void PrunedCache()
{
int maxCacheSize = 30;

// initialize
QuoteHub<Quote> provider = new(maxCacheSize);
SmaHub<Quote> observer = provider.ToSma(20);
IReadOnlyList<SmaResult> seriesList = Quotes.ToSma(20);

// add quotes
provider.Add(Quotes.Take(maxCacheSize));

// assert: cache size is full size
provider.Quotes.Should().HaveCount(maxCacheSize);
observer.Results.Should().HaveCount(maxCacheSize);

// add more quotes to exceed max cache size
provider.Add(Quotes.Skip(maxCacheSize).Take(10));

// assert: cache size is pruned
provider.Results.Should().HaveCount(maxCacheSize);
observer.Results.Should().HaveCount(maxCacheSize);

// assert: correct values remain
provider.Quotes.Should().BeEquivalentTo(
Quotes.Skip(10).Take(maxCacheSize));

observer.Results.Should().BeEquivalentTo(
seriesList.Skip(10).Take(maxCacheSize));
}

[TestMethod]
public void PrunedAsymmetric() =>
// TODO: asymetric results (e.g. Renko)
// pruned to correct date, instead of count
Assert.Inconclusive("not implemented");
}
Loading