From 52b52fc03d02e81b1e44a89f15fbe32fc6c95cb3 Mon Sep 17 00:00:00 2001 From: Martin Molinero Date: Wed, 1 Nov 2023 13:24:12 -0300 Subject: [PATCH] Fix data queue handler session refresh - Fix data queue handler session refresh handling. Adding unit test --- .../TradierBrokerageDataQueueHandlerTests.cs | 81 +++++++++++++++++++ .../TradierBrokerageTests.cs | 2 +- .../TradierBrokerage.DataQueueHandler.cs | 19 ++++- .../TradierStreamSession.cs | 10 --- 4 files changed, 100 insertions(+), 12 deletions(-) create mode 100644 QuantConnect.TradierBrokerage.Tests/TradierBrokerageDataQueueHandlerTests.cs diff --git a/QuantConnect.TradierBrokerage.Tests/TradierBrokerageDataQueueHandlerTests.cs b/QuantConnect.TradierBrokerage.Tests/TradierBrokerageDataQueueHandlerTests.cs new file mode 100644 index 0000000..7f5f510 --- /dev/null +++ b/QuantConnect.TradierBrokerage.Tests/TradierBrokerageDataQueueHandlerTests.cs @@ -0,0 +1,81 @@ +/* + * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. + * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +using NUnit.Framework; +using System.Threading; +using QuantConnect.Data; +using QuantConnect.Logging; +using QuantConnect.Data.Market; +using QuantConnect.Brokerages.Tradier; + +namespace QuantConnect.Tests.Brokerages.Tradier +{ + [TestFixture] + public partial class TradierBrokerageTests : BrokerageTests + { + private static TestCaseData[] TestParameters + { + get + { + return new[] + { + // valid parameters, for example + new TestCaseData(Symbols.AAPL, Resolution.Tick, false), + }; + } + } + + [Test, TestCaseSource(nameof(TestParameters)), Explicit("Long execution time")] + public void StreamsData(Symbol symbol, Resolution resolution, bool throwsException) + { + var cancelationToken = new CancellationTokenSource(); + + var tradier = (TradierBrokerage)Brokerage; + SubscriptionDataConfig[] configs; + if (resolution == Resolution.Tick) + { + var tradeConfig = new SubscriptionDataConfig(GetSubscriptionDataConfig(symbol, resolution), tickType: TickType.Trade); + var quoteConfig = new SubscriptionDataConfig(GetSubscriptionDataConfig(symbol, resolution), tickType: TickType.Quote); + configs = new[] { tradeConfig, quoteConfig }; + } + else + { + configs = new[] { GetSubscriptionDataConfig(symbol, resolution), + GetSubscriptionDataConfig(symbol, resolution) }; + } + + foreach (var config in configs) + { + ProcessFeed(tradier.Subscribe(config, (s, e) => { }), + cancelationToken, + (baseData) => { + if (baseData != null) { Log.Trace($"{baseData}"); } + }); + } + + // long runtime so we assert the session refresh and data stream is not interrupted + Thread.Sleep(1000 * 15 * 60); + + foreach (var config in configs) + { + tradier.Unsubscribe(config); + } + + Thread.Sleep(1000); + + cancelationToken.Cancel(); + } + } +} diff --git a/QuantConnect.TradierBrokerage.Tests/TradierBrokerageTests.cs b/QuantConnect.TradierBrokerage.Tests/TradierBrokerageTests.cs index c2c6082..ccbfe75 100644 --- a/QuantConnect.TradierBrokerage.Tests/TradierBrokerageTests.cs +++ b/QuantConnect.TradierBrokerage.Tests/TradierBrokerageTests.cs @@ -29,7 +29,7 @@ namespace QuantConnect.Tests.Brokerages.Tradier { - public class TradierBrokerageTests : BrokerageTests + public partial class TradierBrokerageTests : BrokerageTests { /// /// Provides the data required to test each order type in various cases diff --git a/QuantConnect.TradierBrokerage/TradierBrokerage.DataQueueHandler.cs b/QuantConnect.TradierBrokerage/TradierBrokerage.DataQueueHandler.cs index ecb53ee..29b886c 100644 --- a/QuantConnect.TradierBrokerage/TradierBrokerage.DataQueueHandler.cs +++ b/QuantConnect.TradierBrokerage/TradierBrokerage.DataQueueHandler.cs @@ -27,6 +27,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Threading.Tasks; namespace QuantConnect.Brokerages.Tradier { @@ -37,6 +38,9 @@ public partial class TradierBrokerage { #region IDataQueueHandler implementation + private readonly static TimeSpan SessionTimeout = TimeSpan.FromMinutes(4.8); + private bool _sessionExpired; + private const string WebSocketUrl = "wss://ws.tradier.com/v1/markets/events"; private object _streamSessionLock = new (); @@ -266,10 +270,23 @@ private TradierStreamSession GetStreamSession() { lock (_streamSessionLock) { - if (_streamSession == null || !_streamSession.IsValid) + if (_streamSession == null || _sessionExpired) { var request = new RestRequest("markets/events/session", Method.POST); _streamSession = Execute(request, TradierApiRequestType.Data, "stream"); + + // clear expired state, we just refreshed it + _sessionExpired = false; + + // after timeout, expire the session and refresh subscriptions, which will refresh token + Task.Delay(SessionTimeout).ContinueWith(_ => + { + lock (_streamSessionLock) + { + _sessionExpired = true; + _subscribeProcedure.Set(); + } + }); } return _streamSession; diff --git a/QuantConnect.TradierBrokerage/TradierStreamSession.cs b/QuantConnect.TradierBrokerage/TradierStreamSession.cs index bfb220d..042cc8e 100644 --- a/QuantConnect.TradierBrokerage/TradierStreamSession.cs +++ b/QuantConnect.TradierBrokerage/TradierStreamSession.cs @@ -14,8 +14,6 @@ * */ -using System; - namespace QuantConnect.Brokerages.Tradier { /// @@ -23,9 +21,6 @@ namespace QuantConnect.Brokerages.Tradier /// public class TradierStreamSession { - private readonly static TimeSpan LifeSpan = TimeSpan.FromMinutes(4.9); - private readonly DateTime _createdTime = DateTime.UtcNow; - /// /// Trading Stream: Session Id /// @@ -35,10 +30,5 @@ public class TradierStreamSession /// Trading Stream: Stream URL /// public string Url; - - /// - /// Determines if this session Id is valid - /// - public bool IsValid => DateTime.UtcNow - _createdTime < LifeSpan; } }