Skip to content

Commit

Permalink
separate Quote cache
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveSkender committed May 27, 2024
1 parent b5cc284 commit a8fa2e2
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 75 deletions.
37 changes: 26 additions & 11 deletions src/_common/Observables/Cache.cs
Original file line number Diff line number Diff line change
@@ -1,32 +1,47 @@
namespace Skender.Stock.Indicators;

// base cache for Quotes
public abstract class QuoteCache<TQuote>
: StreamCache<TQuote>
where TQuote : IQuote, new()
{
internal QuoteCache()
: base(isChainor: false) { }

public IEnumerable<TQuote> Quotes => Cache;
}

// base cache for Indicator results
public abstract class ResultCache<TResult>
: StreamCache<TResult>
where TResult : IResult, new()
{
internal ResultCache(bool isChainor)
: base(isChainor) { }

public IEnumerable<TResult> Results => Cache;
}

// base result or series cache
/// <inheritdoc />
public abstract class SeriesCache<TSeries>
/// <inheritdoc cref="IStreamCache{TSeris}"/>
public abstract class StreamCache<TSeries>
: ChainProvider, IStreamCache<TSeries>
where TSeries : ISeries, new()
{
// fields
private readonly bool isChainor;

// constructor
private protected SeriesCache()
protected internal StreamCache(bool isChainor)
{
Cache = [];
LastArrival = new();
OverflowCount = 0;

Type? reuseType = typeof(TSeries)
.GetInterface("IReusableResult");

isChainor = reuseType != null
&& reuseType.Name == "IReusableResult";
this.isChainor = isChainor;
}

// PROPERTIES

public IEnumerable<TSeries> Results => Cache;

internal List<TSeries> Cache { get; set; }

internal TSeries LastArrival { get; set; }
Expand Down
6 changes: 4 additions & 2 deletions src/_common/Observables/ChainObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ namespace Skender.Stock.Indicators;
// CHAIN OBSERVER

public abstract class ChainObserver<TResult>
: SeriesCache<TResult>, IChainObserver<TResult>
: ResultCache<TResult>, IChainObserver<TResult>
where TResult : IResult, new()
{
internal IDisposable? unsubscriber;

private protected ChainObserver(ChainProvider provider)
private protected ChainObserver(
ChainProvider provider,
bool isChainor) : base(isChainor)
{
ChainSupplier = provider;
}
Expand Down
49 changes: 17 additions & 32 deletions src/_common/Observables/Interfaces.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,14 @@ namespace Skender.Stock.Indicators;

// PUBLIC INTERFACES ONLY *****
// Reminder: do not add non-public elements here for internal templating.
// Conversly, non-public members should be defined as internal or private.

/// <summary>
/// Observable provider incremental quotes from external feed sources.
/// </summary>
/// <typeparam name="TQuote" cref="IQuote">
/// Quote based on IQuote interface
/// </typeparam>
public interface IQuoteProvider<TQuote>
: IStreamCache<TQuote>, IObservable<(Act act, TQuote quote)>
where TQuote : IQuote
public interface IQuoteProvider
: IObservable<(Act act, IQuote quote)>
{
/// <summary>
/// Add a single quote. We'll determine if it's new or an update.
/// </summary>
/// <param name="quote">Quote to add or update</param>
Act Add(TQuote quote);

/// <summary>
/// Delete a quote. We'll double-check that it exists in the
/// cache before propogating the event to subscribers.
/// </summary>
/// <param name="quote">Quote to delete</param>
Act Delete(TQuote quote);

/// <summary>
/// Terminates all subscriber connections gracefully.
/// </summary>
Expand All @@ -47,10 +31,9 @@ public interface IChainProvider
/// <summary>
/// Observer of incremental quotes from external feed sources.
/// </summary>
/// <typeparam name="TQuote"></typeparam>
public interface IQuoteObserver<TQuote>
: IObserver<(Act act, TQuote quote)>
where TQuote : IQuote
public interface IQuoteObserver<TResult>
: IStreamCache<TResult>, IObserver<(Act act, IQuote quote)>
where TResult : IResult
{
/// <summary>
/// Unsubscribe from the data provider
Expand All @@ -67,7 +50,10 @@ public interface IQuoteObserver<TQuote>
/// Reset and rebuild the results cache from a point in time.
/// Use RebuildCache() without arguments to reset the entire cache.
/// </summary>
/// <param name="fromTimestamp"></param>
/// <param name="fromTimestamp">
/// All periods (inclusive) after this DateTime will
/// be removed and recalculated.
/// </param>
void RebuildCache(DateTime fromTimestamp);
}

Expand All @@ -94,7 +80,10 @@ public interface IChainObserver<TResult>
/// Reset and rebuild the results cache from a point in time.
/// Use RebuildCache() without arguments to reset the entire cache.
/// </summary>
/// <param name="fromTimestamp"></param>
/// <param name="fromTimestamp">
/// All periods (inclusive) after this DateTime will
/// be removed and recalculated.
/// </param>
void RebuildCache(DateTime fromTimestamp);
}

Expand All @@ -105,12 +94,6 @@ public interface IChainObserver<TResult>
public interface IStreamCache<TSeries>
where TSeries : ISeries
{
/// <summary>
/// Read-only quote or indicator time-series values.
/// They are automatically updated.
/// </summary>
IEnumerable<TSeries> Results { get; }

/// <summary>
/// Returns a short formatted label with parameter values, e.g. EMA(10)
/// </summary>
Expand All @@ -127,6 +110,8 @@ public interface IStreamCache<TSeries>
/// Deletes cached time-series records from point in time, without restore.
/// Subscribed indicators' caches will also be deleted accordingly.
/// </summary>
/// <param name="fromTimestamp"></param>
/// <param name="fromTimestamp">
/// All periods (inclusive) after this DateTime will be removed.
/// </param>
void ClearCache(DateTime fromTimestamp);
}
8 changes: 5 additions & 3 deletions src/_common/Observables/QuoteObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ namespace Skender.Stock.Indicators;
// QUOTE OBSERVER

public abstract class QuoteObserver<TQuote, TResult>
: SeriesCache<TResult>, IQuoteObserver<TQuote>
: ResultCache<TResult>, IQuoteObserver<TResult>
where TQuote : IQuote, new()
where TResult : IResult, new()
{
internal IDisposable? unsubscriber;

private protected QuoteObserver(QuoteProvider<TQuote> provider)
internal QuoteObserver(
QuoteProvider<TQuote> provider,
bool isChainor) : base(isChainor)
{
QuoteSupplier = provider;
}
Expand All @@ -22,7 +24,7 @@ private protected QuoteObserver(QuoteProvider<TQuote> provider)

// standard observer properties

public abstract void OnNext((Act act, TQuote quote) value);
public abstract void OnNext((Act act, IQuote quote) value);

public void OnError(Exception error) => throw error;

Expand Down
38 changes: 21 additions & 17 deletions src/_common/Observables/QuoteProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,31 @@ namespace Skender.Stock.Indicators;
// QUOTE PROVIDER

public class QuoteProvider<TQuote>
: SeriesCache<TQuote>, IQuoteProvider<TQuote>
: QuoteCache<TQuote>, IQuoteProvider
where TQuote : IQuote, new()
{
// fields
private readonly List<IObserver<(Act, TQuote)>> observers;
private readonly List<IObserver<(Act, IQuote)>> observers;

// constructor
public QuoteProvider()
{
observers = [];
Cache = [];

Initialize();
}

// PROPERTIES

public IEnumerable<TQuote> Quotes => Cache;

// METHODS

// string label
public override string ToString()
=> $"Quote Provider ({Cache.Count} items)";

// add one
/// <summary>
/// Add a single quote. We'll determine if it's new or an update.
/// </summary>
/// <param name="quote">Quote to add or update</param>
public Act Add(TQuote quote)
{
try
Expand Down Expand Up @@ -62,7 +62,11 @@ public void Add(IEnumerable<TQuote> quotes)
}
}

// delete one
/// <summary>
/// Delete a quote. We'll double-check that it exists in the
/// cache before propogating the event to subscribers.
/// </summary>
/// <param name="quote">Quote to delete</param>
public Act Delete(TQuote quote)
{
try
Expand All @@ -88,7 +92,7 @@ public Act Delete(TQuote quote)
public void Initialize() => ClearCache();

// subscribe observer
public IDisposable Subscribe(IObserver<(Act, TQuote)> observer)
public IDisposable Subscribe(IObserver<(Act, IQuote)> observer)
{
if (!observers.Contains(observer))
{
Expand All @@ -101,7 +105,7 @@ public IDisposable Subscribe(IObserver<(Act, TQuote)> observer)
// unsubscribe all observers
public override void EndTransmission()
{
foreach (IObserver<(Act, TQuote)> obs in observers.ToArray())
foreach (IObserver<(Act, IQuote)> obs in observers.ToArray())
{
if (observers.Contains(obs))
{
Expand Down Expand Up @@ -136,7 +140,7 @@ internal override void RebuildCache(int fromIndex, int offset)
=> throw new InvalidOperationException();

// notify observers
private void NotifyObservers((Act act, TQuote quote) quoteMessage)
private void NotifyObservers((Act act, IQuote quote) quoteMessage)
{
// do not propogate "do nothing" acts
if (quoteMessage.act == Act.DoNothing)
Expand All @@ -145,23 +149,23 @@ private void NotifyObservers((Act act, TQuote quote) quoteMessage)
}

// send to subscribers
List<IObserver<(Act, TQuote)>> obsList = [.. observers];
List<IObserver<(Act, IQuote)>> obsList = [.. observers];

for (int i = 0; i < obsList.Count; i++)
{
IObserver<(Act, TQuote)> obs = obsList[i];
IObserver<(Act, IQuote)> obs = obsList[i];
obs.OnNext(quoteMessage);
}
}

// unsubscriber
private class Unsubscriber(
List<IObserver<(Act, TQuote)>> observers,
IObserver<(Act, TQuote)> observer) : IDisposable
List<IObserver<(Act, IQuote)>> observers,
IObserver<(Act, IQuote)> observer) : IDisposable
{
// can't mutate and iterate on same list, make copy
private readonly List<IObserver<(Act, TQuote)>> observers = observers;
private readonly IObserver<(Act, TQuote)> observer = observer;
private readonly List<IObserver<(Act, IQuote)>> observers = observers;
private readonly IObserver<(Act, IQuote)> observer = observer;

// remove single observer
public void Dispose()
Expand Down
6 changes: 3 additions & 3 deletions src/_common/ObsoleteV3.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# v3 migration guide

We've discontinued all bridge features for v1 backwards compatiblity.
We've discontinued all bridge features for v1 backwards compatibility.
Correct all Warnings from this library before migrating from v2 to v3.
If you are still using v1, migrate to v2 first, to ease the transition to v3.

Expand Down Expand Up @@ -38,9 +38,9 @@ See your compiler `Warning` to identify these in your code.
## Breaking changes

Not all, but some of these will be shown as compiler `Errors` in your code.
Items marked with &#128681; require special attention since they will not produce compiler Errors or Warings.
Items marked with &#128681; require special attention since they will not produce compiler Errors or Warnings.

- all backwards compatible v1 accomodations removed.
- all backwards compatible v1 accommodations removed.

- no longer supporting .NET Standard 2.0 for older .NET Framework compatibility.

Expand Down
4 changes: 2 additions & 2 deletions src/_common/Use (quote converter)/Use.Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class Use<TQuote> : QuoteObserver<TQuote, UseResult>
public Use(
QuoteProvider<TQuote> provider,
CandlePart candlePart) :
base(provider)
base(provider, isChainor: true)
{
CandlePartSelection = candlePart;

Expand All @@ -33,7 +33,7 @@ public override string ToString()
=> $"USE({Enum.GetName(typeof(CandlePart), CandlePartSelection)})";

// handle quote arrival
public override void OnNext((Act act, TQuote quote) value)
public override void OnNext((Act act, IQuote quote) value)
{
// candidate result
(DateTime d, double v) = value.quote.ToTuple(CandlePartSelection);
Expand Down
3 changes: 2 additions & 1 deletion src/a-d/Alligator/Alligator.Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ public Alligator(
int teethOffset,
int lipsPeriods,
int lipsOffset)
: base(provider)
: base(provider,
isChainor: false)
{
Validate(
jawPeriods,
Expand Down
2 changes: 1 addition & 1 deletion src/e-k/Ema/Ema.Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public partial class Ema : ChainObserver<EmaResult>, IEma
public Ema(
ChainProvider provider,
int lookbackPeriods)
: base(provider)
: base(provider, isChainor: true)
{
Validate(lookbackPeriods);

Expand Down
2 changes: 1 addition & 1 deletion src/s-z/Sma/Sma.Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public partial class Sma : ChainObserver<SmaResult>, ISma
public Sma(
ChainProvider provider,
int lookbackPeriods)
: base(provider)
: base(provider, isChainor: true)
{
Validate(lookbackPeriods);

Expand Down
3 changes: 1 addition & 2 deletions tests/indicators/_common/Observables/QuoteProvider.Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public override void QuoteObserver()
}

// confirm public interfaces
Assert.AreEqual(provider.Cache.Count, provider.Results.Count());
Assert.AreEqual(provider.Cache.Count, provider.Quotes.Count());

// close observations
Expand Down Expand Up @@ -104,7 +103,7 @@ public void Overflow()
}
});

Assert.AreEqual(1, provider.Results.Count());
Assert.AreEqual(1, provider.Quotes.Count());

provider.EndTransmission();
}
Expand Down

0 comments on commit a8fa2e2

Please sign in to comment.