-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy pathMarketListenerPeriodic.cs
137 lines (112 loc) · 4.63 KB
/
MarketListenerPeriodic.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using BetfairNG.Data;
namespace BetfairNG
{
public class MarketListenerPeriodic : IDisposable
{
private readonly PriceProjection _priceProjection;
private readonly BetfairClient _client;
private DateTime _latestDataRequestStart = DateTime.Now;
private DateTime _latestDataRequestFinish = DateTime.Now;
private readonly object _lockObj = new object();
private readonly ConcurrentDictionary<string, IObservable<MarketBook>> _markets =
new ConcurrentDictionary<string, IObservable<MarketBook>>();
private readonly ConcurrentDictionary<string, IObserver<MarketBook>> _observers =
new ConcurrentDictionary<string, IObserver<MarketBook>>();
private readonly IDisposable _polling;
private MarketListenerPeriodic(BetfairClient client,
PriceProjection priceProjection,
double periodInSec)
{
_client = client;
_priceProjection = priceProjection;
_polling = Observable.Interval(TimeSpan.FromSeconds(periodInSec),
NewThreadScheduler.Default).Subscribe(l => DoWork());
}
public static MarketListenerPeriodic Create(BetfairClient client,
PriceProjection priceProjection,
double periodInSec)
{
return new MarketListenerPeriodic(client, priceProjection, periodInSec);
}
public IObservable<Runner> SubscribeRunner(string marketId, long selectionId)
{
var marketTicks = SubscribeMarketBook(marketId);
var observable = Observable.Create<Runner>(
(IObserver<Runner> observer) =>
{
var subscription = marketTicks.Subscribe(tick =>
{
var runner = tick.Runners.First(c => c.SelectionId == selectionId);
// attach the book
runner.MarketBook = tick;
observer.OnNext(runner);
});
return Disposable.Create(() => subscription.Dispose());
})
.Publish()
.RefCount();
return observable;
}
public IObservable<MarketBook> SubscribeMarketBook(string marketId)
{
IObservable<MarketBook> market;
if (_markets.TryGetValue(marketId, out market))
return market;
var observable = Observable.Create<MarketBook>(
(IObserver<MarketBook> observer) =>
{
_observers.AddOrUpdate(marketId, observer, (key, existingVal) => existingVal);
return Disposable.Create(() =>
{
IObserver<MarketBook> ob;
IObservable<MarketBook> o;
_markets.TryRemove(marketId, out o);
_observers.TryRemove(marketId, out ob);
});
})
.Publish()
.RefCount();
_markets.AddOrUpdate(marketId, observable, (key, existingVal) => existingVal);
return observable;
}
private void DoWork()
{
var book = _client.ListMarketBook(_markets.Keys.ToList(), this._priceProjection).Result;
if (book.HasError)
{
foreach (var observer in _observers)
observer.Value.OnError(book.Error);
return;
}
// we may have fresher data than the response to this request
if (book.RequestStart < _latestDataRequestStart && book.LastByte > _latestDataRequestFinish)
return;
lock (_lockObj)
{
_latestDataRequestStart = book.RequestStart;
_latestDataRequestFinish = book.LastByte;
}
foreach (var market in book.Response)
{
IObserver<MarketBook> o;
if (!_observers.TryGetValue(market.MarketId, out o)) continue;
// check to see if the market is finished
if (market.Status == MarketStatus.CLOSED ||
market.Status == MarketStatus.INACTIVE)
o.OnCompleted();
else
o.OnNext(market);
}
}
public void Dispose()
{
if (_polling != null) _polling.Dispose();
}
}
}