Lean  $LEAN_TAG$
LiveCustomDataSubscriptionEnumeratorFactory.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using System.Linq;
20 using Python.Runtime;
21 using QuantConnect.Data;
24 using QuantConnect.Util;
25 
27 {
28  /// <summary>
29  /// Provides an implementation of <see cref="ISubscriptionEnumeratorFactory"/> to handle live custom data.
30  /// </summary>
32  {
33  private readonly TimeSpan _minimumIntervalCheck;
34  private readonly ITimeProvider _timeProvider;
35  private readonly Func<DateTime, DateTime> _dateAdjustment;
36  private readonly IObjectStore _objectStore;
37 
38  /// <summary>
39  /// Initializes a new instance of the <see cref="LiveCustomDataSubscriptionEnumeratorFactory"/> class
40  /// </summary>
41  /// <param name="timeProvider">Time provider from data feed</param>
42  /// <param name="objectStore">The object store to use</param>
43  /// <param name="dateAdjustment">Func that allows adjusting the datetime to use</param>
44  /// <param name="minimumIntervalCheck">Allows specifying the minimum interval between each enumerator refresh and data check, default is 30 minutes</param>
46  Func<DateTime, DateTime> dateAdjustment = null, TimeSpan? minimumIntervalCheck = null)
47  {
48  _timeProvider = timeProvider;
49  _dateAdjustment = dateAdjustment;
50  _minimumIntervalCheck = minimumIntervalCheck ?? TimeSpan.FromMinutes(30);
51  _objectStore = objectStore;
52  }
53 
54  /// <summary>
55  /// Creates an enumerator to read the specified request.
56  /// </summary>
57  /// <param name="request">The subscription request to be read</param>
58  /// <param name="dataProvider">Provider used to get data when it is not present on disk</param>
59  /// <returns>An enumerator reading the subscription request</returns>
60  public IEnumerator<BaseData> CreateEnumerator(SubscriptionRequest request, IDataProvider dataProvider)
61  {
62  var config = request.Configuration;
63 
64  // frontier value used to prevent emitting duplicate time stamps between refreshed enumerators
65  // also provides some immediate fast-forward to handle spooling through remote files quickly
66  var frontier = Ref.Create(_dateAdjustment?.Invoke(request.StartTimeLocal) ?? request.StartTimeLocal);
67  var lastSourceRefreshTime = DateTime.MinValue;
68  var sourceFactory = config.GetBaseDataInstance();
69 
70  // this is refreshing the enumerator stack for each new source
71  var refresher = new RefreshEnumerator<BaseData>(() =>
72  {
73  // rate limit the refresh of this enumerator stack
74  var utcNow = _timeProvider.GetUtcNow();
75  var minimumTimeBetweenCalls = GetMinimumTimeBetweenCalls(config.Increment, _minimumIntervalCheck);
76  if (utcNow - lastSourceRefreshTime < minimumTimeBetweenCalls)
77  {
78  return Enumerable.Empty<BaseData>().GetEnumerator();
79  }
80 
81  lastSourceRefreshTime = utcNow;
82  var localDate = _dateAdjustment?.Invoke(utcNow.ConvertFromUtc(config.ExchangeTimeZone).Date) ?? utcNow.ConvertFromUtc(config.ExchangeTimeZone).Date;
83  var source = sourceFactory.GetSource(config, localDate, true);
84 
85  // fetch the new source and enumerate the data source reader
86  var enumerator = EnumerateDataSourceReader(config, dataProvider, frontier, source, localDate, sourceFactory);
87 
88  if (SourceRequiresFastForward(source))
89  {
90  // The FastForwardEnumerator implements these two features:
91  // (1) make sure we never emit past data
92  // (2) data filtering based on a maximum data age
93  // For custom data we don't want feature (2) because we would reject data points emitted later
94  // (e.g. Quandl daily data after a weekend), so we disable it using a huge maximum data age.
95 
96  // apply fast forward logic for file transport mediums
97  var maximumDataAge = GetMaximumDataAge(Time.MaxTimeSpan);
98  enumerator = new FastForwardEnumerator(enumerator, _timeProvider, config.ExchangeTimeZone, maximumDataAge);
99  }
100  else
101  {
102  // rate limit calls to this enumerator stack
103  enumerator = new RateLimitEnumerator<BaseData>(enumerator, _timeProvider, minimumTimeBetweenCalls);
104  }
105 
106  if (source.Format == FileFormat.UnfoldingCollection)
107  {
108  // unroll collections into individual data points after fast forward/rate limiting applied
109  enumerator = enumerator.SelectMany(data =>
110  {
111  var collection = data as BaseDataCollection;
112  IEnumerator<BaseData> collectionEnumerator;
113  if (collection != null)
114  {
115  if (source.TransportMedium == SubscriptionTransportMedium.Rest || source.TransportMedium == SubscriptionTransportMedium.RemoteFile)
116  {
117  // we want to make sure the data points we *unroll* are not past
118  collectionEnumerator = collection.Data
119  .Where(baseData => baseData.EndTime > frontier.Value)
120  .GetEnumerator();
121  }
122  else
123  {
124  collectionEnumerator = collection.Data.GetEnumerator();
125  }
126  }
127  else
128  {
129  collectionEnumerator = new List<BaseData> { data }.GetEnumerator();
130  }
131  return collectionEnumerator;
132  });
133  }
134 
135  return enumerator;
136  });
137 
138  return refresher;
139  }
140 
141  private IEnumerator<BaseData> EnumerateDataSourceReader(SubscriptionDataConfig config, IDataProvider dataProvider, Ref<DateTime> localFrontier, SubscriptionDataSource source, DateTime localDate, BaseData baseDataInstance)
142  {
143  using (var dataCacheProvider = new SingleEntryDataCacheProvider(dataProvider))
144  {
145  var newLocalFrontier = localFrontier.Value;
146  var dataSourceReader = GetSubscriptionDataSourceReader(source, dataCacheProvider, config, localDate, baseDataInstance, dataProvider);
147  using var subscriptionEnumerator = SortEnumerator<DateTime>.TryWrapSortEnumerator(source.Sort, dataSourceReader.Read(source));
148  foreach (var datum in subscriptionEnumerator)
149  {
150  // always skip past all times emitted on the previous invocation of this enumerator
151  // this allows data at the same time from the same refresh of the source while excluding
152  // data from different refreshes of the source
153  if (datum != null && datum.EndTime > localFrontier.Value)
154  {
155  yield return datum;
156  }
157  else if (!SourceRequiresFastForward(source))
158  {
159  // if the 'source' is Rest and there is no new value,
160  // we *break*, else we will be caught in a tight loop
161  // because Rest source never ends!
162  // edit: we 'break' vs 'return null' so that the source is refreshed
163  // allowing date changes to impact the source value
164  // note it will respect 'minimumTimeBetweenCalls'
165  break;
166  }
167 
168  if (datum != null)
169  {
170  newLocalFrontier = Time.Max(datum.EndTime, newLocalFrontier);
171 
172  if (!SourceRequiresFastForward(source))
173  {
174  // if the 'source' is Rest we need to update the localFrontier here
175  // because Rest source never ends!
176  // Should be advance frontier for all source types here?
177  localFrontier.Value = newLocalFrontier;
178  }
179  }
180  }
181 
182  localFrontier.Value = newLocalFrontier;
183  }
184  }
185 
186  /// <summary>
187  /// Gets the <see cref="ISubscriptionDataSourceReader"/> for the specified source
188  /// </summary>
190  IDataCacheProvider dataCacheProvider,
191  SubscriptionDataConfig config,
192  DateTime date,
193  BaseData baseDataInstance,
194  IDataProvider dataProvider
195  )
196  {
197  return SubscriptionDataSourceReader.ForSource(source, dataCacheProvider, config, date, true, baseDataInstance, dataProvider, _objectStore);
198  }
199 
200  private bool SourceRequiresFastForward(SubscriptionDataSource source)
201  {
202  return source.TransportMedium == SubscriptionTransportMedium.LocalFile
203  || source.TransportMedium == SubscriptionTransportMedium.RemoteFile;
204  }
205 
206  private static TimeSpan GetMinimumTimeBetweenCalls(TimeSpan increment, TimeSpan minimumInterval)
207  {
208  return TimeSpan.FromTicks(Math.Min(increment.Ticks, minimumInterval.Ticks));
209  }
210 
211  private static TimeSpan GetMaximumDataAge(TimeSpan increment)
212  {
213  return TimeSpan.FromTicks(Math.Max(increment.Ticks, TimeSpan.FromSeconds(5).Ticks));
214  }
215  }
216 }