Lean  $LEAN_TAG$
SubscriptionDataReader.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Linq;
19 using QuantConnect.Util;
20 using QuantConnect.Data;
21 using System.Collections;
22 using System.Globalization;
23 using QuantConnect.Logging;
25 using System.Collections.Generic;
32 
34 {
35  /// <summary>
36  /// Subscription data reader is a wrapper on the stream reader class to download, unpack and iterate over a data file.
37  /// </summary>
38  /// <remarks>The class accepts any subscription configuration and automatically makes it available to enumerate</remarks>
39  public class SubscriptionDataReader : IEnumerator<BaseData>, ITradableDatesNotifier, IDataProviderEvents
40  {
41  private IDataProvider _dataProvider;
42  private IObjectStore _objectStore;
43  private bool _initialized;
44 
45  // Source string to create memory stream:
46  private SubscriptionDataSource _source;
47 
48  private bool _endOfStream;
49 
50  private IEnumerator<BaseData> _subscriptionFactoryEnumerator;
51 
52  /// Configuration of the data-reader:
53  private readonly SubscriptionDataConfig _config;
54 
55  /// true if we can find a scale factor file for the security of the form: ..\Lean\Data\equity\market\factor_files\{SYMBOL}.csv
56  private bool _hasScaleFactors;
57 
58  // Location of the datafeed - the type of this data.
59 
60  // Create a single instance to invoke all Type Methods:
61  private BaseData _dataFactory;
62 
63  //Start finish times of the backtest:
64  private DateTime _periodStart;
65  private readonly DateTime _periodFinish;
66 
67  private readonly IMapFileProvider _mapFileProvider;
68  private readonly IFactorFileProvider _factorFileProvider;
69  private IFactorProvider _factorFile;
70  private MapFile _mapFile;
71 
72  private bool _pastDelistedDate;
73 
74  private BaseData _previous;
75  private decimal? _lastRawPrice;
76  private DateChangeTimeKeeper _timeKeeper;
77  private readonly IEnumerable<DateTime> _tradableDatesInDataTimeZone;
78  private readonly SecurityExchangeHours _exchangeHours;
79 
80  // used when emitting aux data from within while loop
81  private readonly IDataCacheProvider _dataCacheProvider;
82  private DateTime _delistingDate;
83 
84  private bool _updatingDataEnumerator;
85 
86  /// <summary>
87  /// Event fired when an invalid configuration has been detected
88  /// </summary>
89  public event EventHandler<InvalidConfigurationDetectedEventArgs> InvalidConfigurationDetected;
90 
91  /// <summary>
92  /// Event fired when the numerical precision in the factor file has been limited
93  /// </summary>
94  public event EventHandler<NumericalPrecisionLimitedEventArgs> NumericalPrecisionLimited;
95 
96  /// <summary>
97  /// Event fired when the start date has been limited
98  /// </summary>
99  public event EventHandler<StartDateLimitedEventArgs> StartDateLimited;
100 
101  /// <summary>
102  /// Event fired when there was an error downloading a remote file
103  /// </summary>
104  public event EventHandler<DownloadFailedEventArgs> DownloadFailed;
105 
106  /// <summary>
107  /// Event fired when there was an error reading the data
108  /// </summary>
109  public event EventHandler<ReaderErrorDetectedEventArgs> ReaderErrorDetected;
110 
111  /// <summary>
112  /// Event fired when there is a new tradable date
113  /// </summary>
114  public event EventHandler<NewTradableDateEventArgs> NewTradableDate;
115 
116  /// <summary>
117  /// Last read BaseData object from this type and source
118  /// </summary>
119  public BaseData Current
120  {
121  get;
122  private set;
123  }
124 
125  /// <summary>
126  /// Explicit Interface Implementation for Current
127  /// </summary>
128  object IEnumerator.Current
129  {
130  get { return Current; }
131  }
132 
133  /// <summary>
134  /// Subscription data reader takes a subscription request, loads the type, accepts the data source and enumerate on the results.
135  /// </summary>
136  /// <param name="config">Subscription configuration object</param>
137  /// <param name="dataRequest">The data request</param>
138  /// <param name="mapFileProvider">Used for resolving the correct map files</param>
139  /// <param name="factorFileProvider">Used for getting factor files</param>
140  /// <param name="dataCacheProvider">Used for caching files</param>
141  /// <param name="dataProvider">The data provider to use</param>
143  BaseDataRequest dataRequest,
144  IMapFileProvider mapFileProvider,
145  IFactorFileProvider factorFileProvider,
146  IDataCacheProvider dataCacheProvider,
147  IDataProvider dataProvider,
148  IObjectStore objectStore)
149  {
150  //Save configuration of data-subscription:
151  _config = config;
152 
153  //Save Start and End Dates:
154  _periodStart = dataRequest.StartTimeLocal;
155  _periodFinish = dataRequest.EndTimeLocal;
156  _mapFileProvider = mapFileProvider;
157  _factorFileProvider = factorFileProvider;
158  _dataCacheProvider = dataCacheProvider;
159 
160  _dataProvider = dataProvider;
161  _objectStore = objectStore;
162 
163  _tradableDatesInDataTimeZone = dataRequest.TradableDaysInDataTimeZone;
164  _exchangeHours = dataRequest.ExchangeHours;
165  }
166 
167  /// <summary>
168  /// Initializes the <see cref="SubscriptionDataReader"/> instance
169  /// </summary>
170  /// <remarks>Should be called after all consumers of <see cref="NewTradableDate"/> event are set,
171  /// since it will produce events.</remarks>
172  public void Initialize()
173  {
174  if (_initialized)
175  {
176  return;
177  }
178 
179  //Save the type of data we'll be getting from the source.
180  try
181  {
182  _dataFactory = _config.GetBaseDataInstance();
183  }
184  catch (ArgumentException exception)
185  {
186  OnInvalidConfigurationDetected(new InvalidConfigurationDetectedEventArgs(_config.Symbol, exception.Message));
187  _endOfStream = true;
188  return;
189  }
190 
191  // If Tiingo data, set the access token in data factory
192  var tiingo = _dataFactory as TiingoPrice;
193  if (tiingo != null)
194  {
195  if (!Tiingo.IsAuthCodeSet)
196  {
197  Tiingo.SetAuthCode(Config.Get("tiingo-auth-token"));
198  }
199  }
200 
201  // load up the map files for equities, options, and custom data if it supports it.
202  // Only load up factor files for equities
203  if (_dataFactory.RequiresMapping())
204  {
205  try
206  {
207  var mapFile = _mapFileProvider.ResolveMapFile(_config);
208 
209  // only take the resolved map file if it has data, otherwise we'll use the empty one we defined above
210  if (mapFile.Any()) _mapFile = mapFile;
211 
212  if (_config.PricesShouldBeScaled())
213  {
214  var factorFile = _factorFileProvider.Get(_config.Symbol);
215  _hasScaleFactors = factorFile != null;
216  if (_hasScaleFactors)
217  {
218  _factorFile = factorFile;
219 
220  // if factor file has minimum date, update start period if before minimum date
221  if (_factorFile != null && _factorFile.FactorFileMinimumDate.HasValue)
222  {
223  if (_periodStart < _factorFile.FactorFileMinimumDate.Value)
224  {
225  _periodStart = _factorFile.FactorFileMinimumDate.Value;
226 
228  new NumericalPrecisionLimitedEventArgs(_config.Symbol,
229  $"[{_config.Symbol.Value}, {_factorFile.FactorFileMinimumDate.Value.ToShortDateString()}]"));
230  }
231  }
232  }
233 
234  if (_periodStart < mapFile.FirstDate)
235  {
236  _periodStart = mapFile.FirstDate;
237 
239  new StartDateLimitedEventArgs(_config.Symbol,
240  $"[{_config.Symbol.Value}," +
241  $" {mapFile.FirstDate.ToString("yyyy-MM-dd", CultureInfo.InvariantCulture)}]"));
242  }
243  }
244  }
245  catch (Exception err)
246  {
247  Log.Error(err, "Fetching Price/Map Factors: " + _config.Symbol.ID + ": ");
248  }
249  }
250 
251  _factorFile ??= _config.Symbol.GetEmptyFactorFile();
252  _mapFile ??= new MapFile(_config.Symbol.Value, Enumerable.Empty<MapFileRow>());
253 
254  _delistingDate = _config.Symbol.GetDelistingDate(_mapFile);
255 
256  _timeKeeper = new DateChangeTimeKeeper(_tradableDatesInDataTimeZone, _config, _exchangeHours, _delistingDate);
257  _timeKeeper.NewExchangeDate += HandleNewTradableDate;
258 
259  UpdateDataEnumerator(true);
260 
261  _initialized = true;
262  }
263 
264  /// <summary>
265  /// Advances the enumerator to the next element of the collection.
266  /// </summary>
267  /// <returns>
268  /// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection.
269  /// </returns>
270  /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created. </exception><filterpriority>2</filterpriority>
271  public bool MoveNext()
272  {
273  if (!_initialized)
274  {
275  // Late initialization so it is performed in the data feed stack
276  // and not in the algorithm thread
277  Initialize();
278  }
279 
280  if (_endOfStream)
281  {
282  return false;
283  }
284 
285  if (Current != null)
286  {
287  // only save previous price data
288  _previous = Current;
289  }
290 
291  if (_subscriptionFactoryEnumerator == null)
292  {
293  _endOfStream = true;
294  return false;
295  }
296 
297  do
298  {
299  if (_pastDelistedDate)
300  {
301  break;
302  }
303 
304  // keep enumerating until we find something that is within our time frame
305  while (_subscriptionFactoryEnumerator.MoveNext())
306  {
307  var instance = _subscriptionFactoryEnumerator.Current;
308  if (instance == null)
309  {
310  // keep reading until we get valid data
311  continue;
312  }
313 
314  // We rely on symbol change to detect a mapping or symbol change, instead of using SubscriptionDataConfig.NewSymbol
315  // because only one of the configs with the same symbol will trigger a symbol change event.
316  var previousMappedSymbol = _config.MappedSymbol;
317 
318  // Advance the time keeper either until the current instance time (to synchronize) or until the source changes.
319  // Note: use time instead of end time to avoid skipping instances that all have the same timestamps in the same file (e.g. universe data)
320  var currentSource = _source;
321  var nextExchangeDate = _config.Resolution == Resolution.Daily
322  && _timeKeeper.IsExchangeBehindData()
323  && !_config.Type.IsAssignableTo(typeof(BaseDataCollection))
324  // If daily and exchange is behind data, data for date X will have a start time within date X-1,
325  // so we use the actual date from end time. e.g. a daily bar for Jan15 can have a start time of Jan14 8PM
326  // (exchange tz 4 hours behind data tz) and end time would be Jan15 8PM.
327  // This doesn't apply to universe files (BaseDataCollection check) because they are not read in the same way
328  // price daily files are read: they are read in a collection with end time of X+1. We don't want to skip them or advance time yet.
329  ? instance.EndTime
330  : instance.Time;
331  while (_timeKeeper.ExchangeTime < nextExchangeDate && currentSource == _source)
332  {
333  _timeKeeper.AdvanceTowardsExchangeTime(nextExchangeDate);
334  }
335 
336  // Source change, check if we should emit the current instance
337  if (currentSource != _source
338  && (
339  // After a mapping for every resolution except daily:
340  // For other resolutions, the instance that triggered the exchange date change should be skipped,
341  // it's end time will be either midnight or for a future date. The new source might have a data point with this times.
342  (_config.MappedSymbol != previousMappedSymbol && _config.Resolution != Resolution.Daily)
343  // Skip if the exchange time zone is behind of the data time zone:
344  // The new source might have data for these same times, we want data for the new symbol
345  || (_config.Resolution == Resolution.Daily && _timeKeeper.IsExchangeBehindData())
346  // skip if the instance if it's beyond what the previous source should have.
347  // e.g. A file mistakenly has data for the next day
348  // (see SubscriptionDataReaderTests.DoesNotEmitDataBeyondTradableDate unit test)
349  // or the instance that triggered the exchange date change is for a future date (no data found in between)
350  || instance.EndTime.ConvertTo(_config.ExchangeTimeZone, _config.DataTimeZone).Date >= _timeKeeper.DataTime.Date
351  ))
352  {
353  continue;
354  }
355 
356  // This can happen after a mapping, we already have data but we need to skip some points that belong to a previous date.
357  if (Current != null && instance.EndTime < _timeKeeper.ExchangeTime)
358  {
359  continue;
360  }
361 
362  // prevent emitting past data, this can happen when switching symbols on daily data
363  if (_previous != null && _config.Resolution != Resolution.Tick)
364  {
365  if (_config.IsCustomData)
366  {
367  // Skip the point if time went backwards for custom data?
368  // TODO: Should this be the case for all datapoints?
369  if (instance.EndTime < _previous.EndTime) continue;
370  }
371  else
372  {
373  // all other resolutions don't allow duplicate end times
374  if (instance.EndTime <= _previous.EndTime) continue;
375  }
376  }
377 
378  if (instance.EndTime < _periodStart)
379  {
380  // keep reading until we get a value on or after the start
381  _previous = instance;
382  continue;
383  }
384 
385  // We have to perform this check after refreshing the enumerator, if appropriate
386  // 'instance' could be a data point far in the future due to remapping (GH issue 5232) in which case it will be dropped
387  if (instance.Time > _periodFinish)
388  {
389  // stop reading when we get a value after the end
390  _endOfStream = true;
391  return false;
392  }
393 
394  // we've made it past all of our filters, we're withing the requested start/end of the subscription,
395  // we've satisfied user and market hour filters, so this data is good to go as current
396  Current = instance;
397 
398  // we keep the last raw price registered before we return so we are not affected by anyone (price scale) modifying our current
399  _lastRawPrice = Current.Price;
400  return true;
401  }
402 
403  // we've ended the enumerator, time to refresh
404  UpdateDataEnumerator(true);
405  }
406  while (_subscriptionFactoryEnumerator != null);
407 
408  _endOfStream = true;
409  return false;
410  }
411 
412  /// <summary>
413  /// Emits a new tradable date event and tries to update the data enumerator if necessary
414  /// </summary>
415  private void HandleNewTradableDate(object sender, DateTime date)
416  {
417  OnNewTradableDate(new NewTradableDateEventArgs(date, _previous, _config.Symbol, _lastRawPrice));
418  UpdateDataEnumerator(false);
419  }
420 
421  /// <summary>
422  /// Resolves the next enumerator to be used in <see cref="MoveNext"/> and updates
423  /// <see cref="_subscriptionFactoryEnumerator"/>
424  /// </summary>
425  /// <returns>True, if the enumerator has been updated (even if updated to null)</returns>
426  private bool UpdateDataEnumerator(bool endOfEnumerator)
427  {
428  // Guard for infinite recursion: during an enumerator update, we might ask for a new date,
429  // which might end up with a new exchange date being detected and another update being requested.
430  // Just skip that update and let's do it ourselves after the date is resolved
431  if (_updatingDataEnumerator)
432  {
433  return false;
434  }
435 
436  _updatingDataEnumerator = true;
437  try
438  {
439  do
440  {
441  var date = _timeKeeper.DataTime.Date;
442 
443  // Update current date only if the enumerator has ended, else we might just need to change files
444  // (e.g. same date, but symbol was mapped)
445  if (endOfEnumerator && !TryGetNextDate(out date))
446  {
447  _subscriptionFactoryEnumerator = null;
448  // if we run out of dates then we're finished with this subscription
449  return true;
450  }
451 
452  // fetch the new source, using the data time zone for the date
453  var newSource = _dataFactory.GetSource(_config, date, false);
454  if (newSource == null)
455  {
456  // move to the next day
457  continue;
458  }
459 
460  // check if we should create a new subscription factory
461  var sourceChanged = _source != newSource && !string.IsNullOrEmpty(newSource.Source);
462  if (sourceChanged)
463  {
464  // dispose of the current enumerator before creating a new one
465  _subscriptionFactoryEnumerator.DisposeSafely();
466 
467  // save off for comparison next time
468  _source = newSource;
469  var subscriptionFactory = CreateSubscriptionFactory(newSource, _dataFactory, _dataProvider);
470  _subscriptionFactoryEnumerator = SortEnumerator<DateTime>.TryWrapSortEnumerator(newSource.Sort, subscriptionFactory.Read(newSource));
471  return true;
472  }
473 
474  // if there's still more in the enumerator and we received the same source from the GetSource call
475  // above, then just keep using the same enumerator as we were before
476  if (!endOfEnumerator) // && !sourceChanged is always true here
477  {
478  return false;
479  }
480 
481  // keep churning until we find a new source or run out of tradeable dates
482  // in live mode tradeable dates won't advance beyond today's date, but
483  // TryGetNextDate will return false if it's already at today
484  }
485  while (true);
486  }
487  finally
488  {
489  _updatingDataEnumerator = false;
490  }
491  }
492 
493  private ISubscriptionDataSourceReader CreateSubscriptionFactory(SubscriptionDataSource source, BaseData baseDataInstance, IDataProvider dataProvider)
494  {
495  var factory = SubscriptionDataSourceReader.ForSource(source, _dataCacheProvider, _config, _timeKeeper.DataTime.Date, false, baseDataInstance, dataProvider, _objectStore);
496  AttachEventHandlers(factory, source);
497  return factory;
498  }
499 
500  private void AttachEventHandlers(ISubscriptionDataSourceReader dataSourceReader, SubscriptionDataSource source)
501  {
502  dataSourceReader.InvalidSource += (sender, args) =>
503  {
504  if (_config.IsCustomData && !_config.Type.GetBaseDataInstance().IsSparseData())
505  {
507  new DownloadFailedEventArgs(_config.Symbol,
508  "We could not fetch the requested data. " +
509  "This may not be valid data, or a failed download of custom data. " +
510  $"Skipping source ({args.Source.Source})."));
511  return;
512  }
513 
514  switch (args.Source.TransportMedium)
515  {
516  case SubscriptionTransportMedium.LocalFile:
517  // the local uri doesn't exist, write an error and return null so we we don't try to get data for today
518  // Log.Trace(string.Format("SubscriptionDataReader.GetReader(): Could not find QC Data, skipped: {0}", source));
519  break;
520 
521  case SubscriptionTransportMedium.RemoteFile:
523  new DownloadFailedEventArgs(_config.Symbol,
524  $"Error downloading custom data source file, skipped: {source} " +
525  $"Error: {args.Exception.Message}", args.Exception.StackTrace));
526  break;
527 
528  case SubscriptionTransportMedium.Rest:
529  break;
530 
531  case SubscriptionTransportMedium.ObjectStore:
532  break;
533 
534  default:
535  throw new ArgumentOutOfRangeException();
536  }
537  };
538 
539  if (dataSourceReader is TextSubscriptionDataSourceReader)
540  {
541  // handle empty files/instantiation errors
542  var textSubscriptionFactory = (TextSubscriptionDataSourceReader)dataSourceReader;
543  // handle parser errors
544  textSubscriptionFactory.ReaderError += (sender, args) =>
545  {
547  new ReaderErrorDetectedEventArgs(_config.Symbol,
548  $"Error invoking {_config.Symbol} data reader. " +
549  $"Line: {args.Line} Error: {args.Exception.Message}",
550  args.Exception.StackTrace));
551  };
552  }
553  }
554 
555  /// <summary>
556  /// Iterates the tradeable dates enumerator
557  /// </summary>
558  /// <param name="date">The next tradeable date</param>
559  /// <returns>True if we got a new date from the enumerator, false if it's exhausted, or in live mode if we're already at today</returns>
560  private bool TryGetNextDate(out DateTime date)
561  {
562  while (_timeKeeper.TryAdvanceUntilNextDataDate())
563  {
564  date = _timeKeeper.DataTime.Date;
565 
566  if (!_mapFile.HasData(date))
567  {
568  continue;
569  }
570 
571  // don't do other checks if we haven't gotten data for this date yet
572  if (_previous != null && _previous.EndTime.ConvertTo(_config.ExchangeTimeZone, _config.DataTimeZone) > date)
573  {
574  continue;
575  }
576 
577  // we've passed initial checks,now go get data for this date!
578  return true;
579  }
580 
581  if (_timeKeeper.ExchangeTime.Date > _delistingDate)
582  {
583  _pastDelistedDate = true;
584  }
585 
586  // no more tradeable dates, we've exhausted the enumerator
587  date = DateTime.MaxValue.Date;
588  return false;
589  }
590 
591  /// <summary>
592  /// Reset the IEnumeration
593  /// </summary>
594  /// <remarks>Not used</remarks>
595  public void Reset()
596  {
597  throw new NotImplementedException("Reset method not implemented. Assumes loop will only be used once.");
598  }
599 
600  /// <summary>
601  /// Dispose of the Stream Reader and close out the source stream and file connections.
602  /// </summary>
603  public void Dispose()
604  {
605  _subscriptionFactoryEnumerator.DisposeSafely();
606 
607  if (_initialized)
608  {
609  _timeKeeper.NewExchangeDate -= HandleNewTradableDate;
610  _timeKeeper.DisposeSafely();
611  }
612  }
613 
614  /// <summary>
615  /// Event invocator for the <see cref="InvalidConfigurationDetected"/> event
616  /// </summary>
617  /// <param name="e">Event arguments for the <see cref="InvalidConfigurationDetected"/> event</param>
619  {
620  InvalidConfigurationDetected?.Invoke(this, e);
621  }
622 
623  /// <summary>
624  /// Event invocator for the <see cref="NumericalPrecisionLimited"/> event
625  /// </summary>
626  /// <param name="e">Event arguments for the <see cref="NumericalPrecisionLimited"/> event</param>
628  {
629  NumericalPrecisionLimited?.Invoke(this, e);
630  }
631 
632  /// <summary>
633  /// Event invocator for the <see cref="StartDateLimited"/> event
634  /// </summary>
635  /// <param name="e">Event arguments for the <see cref="StartDateLimited"/> event</param>
637  {
638  StartDateLimited?.Invoke(this, e);
639  }
640 
641  /// <summary>
642  /// Event invocator for the <see cref="DownloadFailed"/> event
643  /// </summary>
644  /// <param name="e">Event arguments for the <see cref="DownloadFailed"/> event</param>
645  protected virtual void OnDownloadFailed(DownloadFailedEventArgs e)
646  {
647  DownloadFailed?.Invoke(this, e);
648  }
649 
650  /// <summary>
651  /// Event invocator for the <see cref="ReaderErrorDetected"/> event
652  /// </summary>
653  /// <param name="e">Event arguments for the <see cref="ReaderErrorDetected"/> event</param>
655  {
656  ReaderErrorDetected?.Invoke(this, e);
657  }
658 
659  /// <summary>
660  /// Event invocator for the <see cref="NewTradableDate"/> event
661  /// </summary>
662  /// <param name="e">Event arguments for the <see cref="NewTradableDate"/> event</param>
663  protected virtual void OnNewTradableDate(NewTradableDateEventArgs e)
664  {
665  NewTradableDate?.Invoke(this, e);
666  }
667  }
668 }