Skip to content

Commit

Permalink
Fix data queue handler session refresh
Browse files Browse the repository at this point in the history
- Fix data queue handler session refresh handling. Adding unit test
  • Loading branch information
Martin-Molinero committed Nov 1, 2023
1 parent ae1be52 commit 52b52fc
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using NUnit.Framework;
using System.Threading;
using QuantConnect.Data;
using QuantConnect.Logging;
using QuantConnect.Data.Market;
using QuantConnect.Brokerages.Tradier;

namespace QuantConnect.Tests.Brokerages.Tradier
{
[TestFixture]
public partial class TradierBrokerageTests : BrokerageTests
{
private static TestCaseData[] TestParameters
{
get
{
return new[]
{
// valid parameters, for example
new TestCaseData(Symbols.AAPL, Resolution.Tick, false),
};
}
}

[Test, TestCaseSource(nameof(TestParameters)), Explicit("Long execution time")]
public void StreamsData(Symbol symbol, Resolution resolution, bool throwsException)
{
var cancelationToken = new CancellationTokenSource();

var tradier = (TradierBrokerage)Brokerage;
SubscriptionDataConfig[] configs;
if (resolution == Resolution.Tick)
{
var tradeConfig = new SubscriptionDataConfig(GetSubscriptionDataConfig<Tick>(symbol, resolution), tickType: TickType.Trade);
var quoteConfig = new SubscriptionDataConfig(GetSubscriptionDataConfig<Tick>(symbol, resolution), tickType: TickType.Quote);
configs = new[] { tradeConfig, quoteConfig };
}
else
{
configs = new[] { GetSubscriptionDataConfig<QuoteBar>(symbol, resolution),
GetSubscriptionDataConfig<TradeBar>(symbol, resolution) };
}

foreach (var config in configs)
{
ProcessFeed(tradier.Subscribe(config, (s, e) => { }),
cancelationToken,
(baseData) => {
if (baseData != null) { Log.Trace($"{baseData}"); }
});
}

// long runtime so we assert the session refresh and data stream is not interrupted
Thread.Sleep(1000 * 15 * 60);

foreach (var config in configs)
{
tradier.Unsubscribe(config);
}

Thread.Sleep(1000);

cancelationToken.Cancel();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

namespace QuantConnect.Tests.Brokerages.Tradier
{
public class TradierBrokerageTests : BrokerageTests
public partial class TradierBrokerageTests : BrokerageTests
{
/// <summary>
/// Provides the data required to test each order type in various cases
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace QuantConnect.Brokerages.Tradier
{
Expand All @@ -37,6 +38,9 @@ public partial class TradierBrokerage
{
#region IDataQueueHandler implementation

private readonly static TimeSpan SessionTimeout = TimeSpan.FromMinutes(4.8);
private bool _sessionExpired;

private const string WebSocketUrl = "wss://ws.tradier.com/v1/markets/events";

private object _streamSessionLock = new ();
Expand Down Expand Up @@ -266,10 +270,23 @@ private TradierStreamSession GetStreamSession()
{
lock (_streamSessionLock)
{
if (_streamSession == null || !_streamSession.IsValid)
if (_streamSession == null || _sessionExpired)
{
var request = new RestRequest("markets/events/session", Method.POST);
_streamSession = Execute<TradierStreamSession>(request, TradierApiRequestType.Data, "stream");

// clear expired state, we just refreshed it
_sessionExpired = false;

// after timeout, expire the session and refresh subscriptions, which will refresh token
Task.Delay(SessionTimeout).ContinueWith(_ =>
{
lock (_streamSessionLock)
{
_sessionExpired = true;
_subscribeProcedure.Set();
}
});
}

return _streamSession;
Expand Down
10 changes: 0 additions & 10 deletions QuantConnect.TradierBrokerage/TradierStreamSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,13 @@
*
*/

using System;

namespace QuantConnect.Brokerages.Tradier
{
/// <summary>
/// Create a new stream session
/// </summary>
public class TradierStreamSession
{
private readonly static TimeSpan LifeSpan = TimeSpan.FromMinutes(4.9);
private readonly DateTime _createdTime = DateTime.UtcNow;

/// <summary>
/// Trading Stream: Session Id
/// </summary>
Expand All @@ -35,10 +30,5 @@ public class TradierStreamSession
/// Trading Stream: Stream URL
/// </summary>
public string Url;

/// <summary>
/// Determines if this session Id is valid
/// </summary>
public bool IsValid => DateTime.UtcNow - _createdTime < LifeSpan;
}
}

0 comments on commit 52b52fc

Please sign in to comment.