Lean  $LEAN_TAG$
SubscriptionUtils.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using QuantConnect.Data;
25 using QuantConnect.Logging;
26 using QuantConnect.Util;
27 
29 {
30  /// <summary>
31  /// Utilities related to data <see cref="Subscription"/>
32  /// </summary>
33  public static class SubscriptionUtils
34  {
35  /// <summary>
36  /// Creates a new <see cref="Subscription"/> which will directly consume the provided enumerator
37  /// </summary>
38  /// <param name="request">The subscription data request</param>
39  /// <param name="enumerator">The data enumerator stack</param>
40  /// <returns>A new subscription instance ready to consume</returns>
41  public static Subscription Create(
42  SubscriptionRequest request,
43  IEnumerator<BaseData> enumerator,
44  bool dailyStrictEndTimeEnabled)
45  {
46  if (enumerator == null)
47  {
48  return GetEndedSubscription(request);
49  }
50  var exchangeHours = request.Security.Exchange.Hours;
51  var timeZoneOffsetProvider = new TimeZoneOffsetProvider(request.Configuration.ExchangeTimeZone, request.StartTimeUtc, request.EndTimeUtc);
52  var dataEnumerator = new SubscriptionDataEnumerator(
53  request.Configuration,
54  exchangeHours,
55  timeZoneOffsetProvider,
56  enumerator,
57  request.IsUniverseSubscription,
58  dailyStrictEndTimeEnabled
59  );
60  return new Subscription(request, dataEnumerator, timeZoneOffsetProvider);
61  }
62 
63  /// <summary>
64  /// Setups a new <see cref="Subscription"/> which will consume a blocking <see cref="EnqueueableEnumerator{T}"/>
65  /// that will be feed by a worker task
66  /// </summary>
67  /// <param name="request">The subscription data request</param>
68  /// <param name="enumerator">The data enumerator stack</param>
69  /// <param name="factorFileProvider">The factor file provider</param>
70  /// <param name="enablePriceScale">Enables price factoring</param>
71  /// <returns>A new subscription instance ready to consume</returns>
73  SubscriptionRequest request,
74  IEnumerator<BaseData> enumerator,
75  IFactorFileProvider factorFileProvider,
76  bool enablePriceScale,
77  bool dailyStrictEndTimeEnabled)
78  {
79  if(enumerator == null)
80  {
81  return GetEndedSubscription(request);
82  }
83  var exchangeHours = request.Security.Exchange.Hours;
84  var enqueueable = new EnqueueableEnumerator<SubscriptionData>(true);
85  var timeZoneOffsetProvider = new TimeZoneOffsetProvider(request.Configuration.ExchangeTimeZone, request.StartTimeUtc, request.EndTimeUtc);
86  var subscription = new Subscription(request, enqueueable, timeZoneOffsetProvider);
87  var config = subscription.Configuration;
88  enablePriceScale = enablePriceScale && config.PricesShouldBeScaled();
89  var lastTradableDate = DateTime.MinValue;
90 
91  Func<int, bool> produce = (workBatchSize) =>
92  {
93  try
94  {
95  var count = 0;
96  while (enumerator.MoveNext())
97  {
98  // subscription has been removed, no need to continue enumerating
99  if (enqueueable.HasFinished)
100  {
101  enumerator.DisposeSafely();
102  return false;
103  }
104 
105  var data = enumerator.Current;
106 
107  // Use our config filter to see if we should emit this
108  // This currently catches Auxiliary data that we don't want to emit
109  if (data != null && !config.ShouldEmitData(data, request.IsUniverseSubscription))
110  {
111  continue;
112  }
113 
114  // In the event we have "Raw" configuration, we will force our subscription data
115  // to precalculate adjusted data. The data will still be emitted as raw, but
116  // if the config is changed at any point it can emit adjusted data as well
117  // See SubscriptionData.Create() and PrecalculatedSubscriptionData for more
118  var requestMode = config.DataNormalizationMode;
119  if (config.SecurityType == SecurityType.Equity)
120  {
121  requestMode = requestMode != DataNormalizationMode.Raw ? requestMode : DataNormalizationMode.Adjusted;
122  }
123 
124  var priceScaleFrontierDate = data.GetUpdatePriceScaleFrontier().Date;
125 
126  // We update our price scale factor when the date changes for non fill forward bars or if we haven't initialized yet.
127  // We don't take into account auxiliary data because we don't scale it and because the underlying price data could be fill forwarded
128  if (enablePriceScale && priceScaleFrontierDate > lastTradableDate && data.DataType != MarketDataType.Auxiliary && (!data.IsFillForward || lastTradableDate == DateTime.MinValue))
129  {
130  var factorFile = factorFileProvider.Get(request.Configuration.Symbol);
131  lastTradableDate = priceScaleFrontierDate;
132  request.Configuration.PriceScaleFactor = factorFile.GetPriceScale(lastTradableDate, requestMode, config.ContractDepthOffset, config.DataMappingMode);
133  }
134 
135  SubscriptionData subscriptionData = SubscriptionData.Create(dailyStrictEndTimeEnabled,
136  config,
137  exchangeHours,
138  subscription.OffsetProvider,
139  data,
140  requestMode,
141  enablePriceScale ? request.Configuration.PriceScaleFactor : null);
142 
143  // drop the data into the back of the enqueueable
144  enqueueable.Enqueue(subscriptionData);
145 
146  count++;
147  // stop executing if added more data than the work batch size, we don't want to fill the ram
148  if (count > workBatchSize)
149  {
150  return true;
151  }
152  }
153  }
154  catch (Exception exception)
155  {
156  Log.Error(exception, $"Subscription worker task exception {request.Configuration}.");
157  }
158 
159  // we made it here because MoveNext returned false or we exploded, stop the enqueueable
160  enqueueable.Stop();
161  // we have to dispose of the enumerator
162  enumerator.DisposeSafely();
163  return false;
164  };
165 
166  WeightedWorkScheduler.Instance.QueueWork(config.Symbol, produce,
167  // if the subscription finished we return 0, so the work is prioritized and gets removed
168  () =>
169  {
170  if (enqueueable.HasFinished)
171  {
172  return 0;
173  }
174  return enqueueable.Count;
175  }
176  );
177 
178  return subscription;
179  }
180 
181  /// <summary>
182  /// Return an ended subscription so it doesn't blow up at runtime on the data worker, this can happen if there's no tradable date
183  /// </summary>
184  private static Subscription GetEndedSubscription(SubscriptionRequest request)
185  {
186  var result = new Subscription(request, null, null);
187  // set subscription as ended
188  result.Dispose();
189  return result;
190  }
191  }
192 }