Lean  $LEAN_TAG$
SubscriptionDataReader.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Linq;
19 using QuantConnect.Util;
20 using QuantConnect.Data;
21 using System.Collections;
22 using System.Globalization;
23 using QuantConnect.Logging;
25 using System.Collections.Generic;
32 
34 {
35  /// <summary>
36  /// Subscription data reader is a wrapper on the stream reader class to download, unpack and iterate over a data file.
37  /// </summary>
38  /// <remarks>The class accepts any subscription configuration and automatically makes it available to enumerate</remarks>
39  public class SubscriptionDataReader : IEnumerator<BaseData>, ITradableDatesNotifier, IDataProviderEvents
40  {
41  private IDataProvider _dataProvider;
42  private IObjectStore _objectStore;
43  private bool _initialized;
44 
45  // Source string to create memory stream:
46  private SubscriptionDataSource _source;
47 
48  private bool _endOfStream;
49 
50  private IEnumerator<BaseData> _subscriptionFactoryEnumerator;
51 
52  /// Configuration of the data-reader:
53  private readonly SubscriptionDataConfig _config;
54 
55  /// true if we can find a scale factor file for the security of the form: ..\Lean\Data\equity\market\factor_files\{SYMBOL}.csv
56  private bool _hasScaleFactors;
57 
58  // Location of the datafeed - the type of this data.
59 
60  // Create a single instance to invoke all Type Methods:
61  private BaseData _dataFactory;
62 
63  //Start finish times of the backtest:
64  private DateTime _periodStart;
65  private readonly DateTime _periodFinish;
66 
67  private readonly IMapFileProvider _mapFileProvider;
68  private readonly IFactorFileProvider _factorFileProvider;
69  private IFactorProvider _factorFile;
70  private MapFile _mapFile;
71 
72  private bool _pastDelistedDate;
73 
74  private BaseData _previous;
75  private decimal? _lastRawPrice;
76  private DateChangeTimeKeeper _timeKeeper;
77  private readonly IEnumerable<DateTime> _tradableDatesInDataTimeZone;
78  private readonly SecurityExchangeHours _exchangeHours;
79 
80  // used when emitting aux data from within while loop
81  private readonly IDataCacheProvider _dataCacheProvider;
82  private DateTime _delistingDate;
83 
84  private bool _updatingDataEnumerator;
85 
86  /// <summary>
87  /// Event fired when an invalid configuration has been detected
88  /// </summary>
89  public event EventHandler<InvalidConfigurationDetectedEventArgs> InvalidConfigurationDetected;
90 
91  /// <summary>
92  /// Event fired when the numerical precision in the factor file has been limited
93  /// </summary>
94  public event EventHandler<NumericalPrecisionLimitedEventArgs> NumericalPrecisionLimited;
95 
96  /// <summary>
97  /// Event fired when the start date has been limited
98  /// </summary>
99  public event EventHandler<StartDateLimitedEventArgs> StartDateLimited;
100 
101  /// <summary>
102  /// Event fired when there was an error downloading a remote file
103  /// </summary>
104  public event EventHandler<DownloadFailedEventArgs> DownloadFailed;
105 
106  /// <summary>
107  /// Event fired when there was an error reading the data
108  /// </summary>
109  public event EventHandler<ReaderErrorDetectedEventArgs> ReaderErrorDetected;
110 
111  /// <summary>
112  /// Event fired when there is a new tradable date
113  /// </summary>
114  public event EventHandler<NewTradableDateEventArgs> NewTradableDate;
115 
116  /// <summary>
117  /// Last read BaseData object from this type and source
118  /// </summary>
119  public BaseData Current
120  {
121  get;
122  private set;
123  }
124 
125  /// <summary>
126  /// Explicit Interface Implementation for Current
127  /// </summary>
128  object IEnumerator.Current
129  {
130  get { return Current; }
131  }
132 
133  /// <summary>
134  /// Subscription data reader takes a subscription request, loads the type, accepts the data source and enumerate on the results.
135  /// </summary>
136  /// <param name="config">Subscription configuration object</param>
137  /// <param name="dataRequest">The data request</param>
138  /// <param name="mapFileProvider">Used for resolving the correct map files</param>
139  /// <param name="factorFileProvider">Used for getting factor files</param>
140  /// <param name="dataCacheProvider">Used for caching files</param>
141  /// <param name="dataProvider">The data provider to use</param>
143  BaseDataRequest dataRequest,
144  IMapFileProvider mapFileProvider,
145  IFactorFileProvider factorFileProvider,
146  IDataCacheProvider dataCacheProvider,
147  IDataProvider dataProvider,
148  IObjectStore objectStore)
149  {
150  //Save configuration of data-subscription:
151  _config = config;
152 
153  //Save Start and End Dates:
154  _periodStart = dataRequest.StartTimeLocal;
155  _periodFinish = dataRequest.EndTimeLocal;
156  _mapFileProvider = mapFileProvider;
157  _factorFileProvider = factorFileProvider;
158  _dataCacheProvider = dataCacheProvider;
159 
160  _dataProvider = dataProvider;
161  _objectStore = objectStore;
162 
163  _tradableDatesInDataTimeZone = dataRequest.TradableDaysInDataTimeZone;
164  _exchangeHours = dataRequest.ExchangeHours;
165  }
166 
167  /// <summary>
168  /// Initializes the <see cref="SubscriptionDataReader"/> instance
169  /// </summary>
170  /// <remarks>Should be called after all consumers of <see cref="NewTradableDate"/> event are set,
171  /// since it will produce events.</remarks>
172  public void Initialize()
173  {
174  if (_initialized)
175  {
176  return;
177  }
178 
179  //Save the type of data we'll be getting from the source.
180  try
181  {
182  _dataFactory = _config.GetBaseDataInstance();
183  }
184  catch (ArgumentException exception)
185  {
186  OnInvalidConfigurationDetected(new InvalidConfigurationDetectedEventArgs(_config.Symbol, exception.Message));
187  _endOfStream = true;
188  return;
189  }
190 
191  // If Tiingo data, set the access token in data factory
192  var tiingo = _dataFactory as TiingoPrice;
193  if (tiingo != null)
194  {
195  if (!Tiingo.IsAuthCodeSet)
196  {
197  Tiingo.SetAuthCode(Config.Get("tiingo-auth-token"));
198  }
199  }
200 
201  // load up the map files for equities, options, and custom data if it supports it.
202  // Only load up factor files for equities
203  if (_dataFactory.RequiresMapping())
204  {
205  try
206  {
207  var mapFile = _mapFileProvider.ResolveMapFile(_config);
208 
209  // only take the resolved map file if it has data, otherwise we'll use the empty one we defined above
210  if (mapFile.Any()) _mapFile = mapFile;
211 
212  if (_config.PricesShouldBeScaled())
213  {
214  var factorFile = _factorFileProvider.Get(_config.Symbol);
215  _hasScaleFactors = factorFile != null;
216  if (_hasScaleFactors)
217  {
218  _factorFile = factorFile;
219 
220  // if factor file has minimum date, update start period if before minimum date
221  if (_factorFile != null && _factorFile.FactorFileMinimumDate.HasValue)
222  {
223  if (_periodStart < _factorFile.FactorFileMinimumDate.Value)
224  {
225  _periodStart = _factorFile.FactorFileMinimumDate.Value;
226 
228  new NumericalPrecisionLimitedEventArgs(_config.Symbol,
229  $"[{_config.Symbol.Value}, {_factorFile.FactorFileMinimumDate.Value.ToShortDateString()}]"));
230  }
231  }
232  }
233 
234  if (_periodStart < mapFile.FirstDate)
235  {
236  _periodStart = mapFile.FirstDate;
237 
239  new StartDateLimitedEventArgs(_config.Symbol,
240  $"[{_config.Symbol.Value}," +
241  $" {mapFile.FirstDate.ToString("yyyy-MM-dd", CultureInfo.InvariantCulture)}]"));
242  }
243  }
244  }
245  catch (Exception err)
246  {
247  Log.Error(err, "Fetching Price/Map Factors: " + _config.Symbol.ID + ": ");
248  }
249  }
250 
251  _factorFile ??= _config.Symbol.GetEmptyFactorFile();
252  _mapFile ??= new MapFile(_config.Symbol.Value, Enumerable.Empty<MapFileRow>());
253 
254  _delistingDate = _config.Symbol.GetDelistingDate(_mapFile);
255 
256  // adding a day so we stop at EOD
257  _delistingDate = _delistingDate.AddDays(1);
258 
259  _timeKeeper = new DateChangeTimeKeeper(_tradableDatesInDataTimeZone, _config, _exchangeHours, _delistingDate);
260  _timeKeeper.NewExchangeDate += HandleNewTradableDate;
261 
262  UpdateDataEnumerator(true);
263 
264  _initialized = true;
265  }
266 
267  /// <summary>
268  /// Advances the enumerator to the next element of the collection.
269  /// </summary>
270  /// <returns>
271  /// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection.
272  /// </returns>
273  /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created. </exception><filterpriority>2</filterpriority>
274  public bool MoveNext()
275  {
276  if (!_initialized)
277  {
278  // Late initialization so it is performed in the data feed stack
279  // and not in the algorithm thread
280  Initialize();
281  }
282 
283  if (_endOfStream)
284  {
285  return false;
286  }
287 
288  if (Current != null)
289  {
290  // only save previous price data
291  _previous = Current;
292  }
293 
294  if (_subscriptionFactoryEnumerator == null)
295  {
296  _endOfStream = true;
297  return false;
298  }
299 
300  do
301  {
302  if (_pastDelistedDate)
303  {
304  break;
305  }
306 
307  // keep enumerating until we find something that is within our time frame
308  while (_subscriptionFactoryEnumerator.MoveNext())
309  {
310  var instance = _subscriptionFactoryEnumerator.Current;
311  if (instance == null)
312  {
313  // keep reading until we get valid data
314  continue;
315  }
316 
317  // We rely on symbol change to detect a mapping or symbol change, instead of using SubscriptionDataConfig.NewSymbol
318  // because only one of the configs with the same symbol will trigger a symbol change event.
319  var previousMappedSymbol = _config.MappedSymbol;
320 
321  // Advance the time keeper either until the current instance time (to synchronize) or until the source changes.
322  // Note: use time instead of end time to avoid skipping instances that all have the same timestamps in the same file (e.g. universe data)
323  var currentSource = _source;
324  var nextExchangeDate = _config.Resolution == Resolution.Daily
325  && _timeKeeper.IsExchangeBehindData()
326  && !_config.Type.IsAssignableTo(typeof(BaseDataCollection))
327  // If daily and exchange is behind data, data for date X will have a start time within date X-1,
328  // so we use the actual date from end time. e.g. a daily bar for Jan15 can have a start time of Jan14 8PM
329  // (exchange tz 4 hours behind data tz) and end time would be Jan15 8PM.
330  // This doesn't apply to universe files (BaseDataCollection check) because they are not read in the same way
331  // price daily files are read: they are read in a collection with end time of X+1. We don't want to skip them or advance time yet.
332  ? instance.EndTime
333  : instance.Time;
334  while (_timeKeeper.ExchangeTime < nextExchangeDate && currentSource == _source)
335  {
336  _timeKeeper.AdvanceTowardsExchangeTime(nextExchangeDate);
337  }
338 
339  // Source change, check if we should emit the current instance
340  if (currentSource != _source
341  && (
342  // After a mapping for every resolution except daily:
343  // For other resolutions, the instance that triggered the exchange date change should be skipped,
344  // it's end time will be either midnight or for a future date. The new source might have a data point with this times.
345  (_config.MappedSymbol != previousMappedSymbol && _config.Resolution != Resolution.Daily)
346  // Skip if the exchange time zone is behind of the data time zone:
347  // The new source might have data for these same times, we want data for the new symbol
348  || (_config.Resolution == Resolution.Daily && _timeKeeper.IsExchangeBehindData())
349  // skip if the instance if it's beyond what the previous source should have.
350  // e.g. A file mistakenly has data for the next day
351  // (see SubscriptionDataReaderTests.DoesNotEmitDataBeyondTradableDate unit test)
352  // or the instance that triggered the exchange date change is for a future date (no data found in between)
353  || instance.EndTime.ConvertTo(_config.ExchangeTimeZone, _config.DataTimeZone).Date >= _timeKeeper.DataTime.Date
354  ))
355  {
356  continue;
357  }
358 
359  // This can happen after a mapping, we already have data but we need to skip some points that belong to a previous date.
360  if (Current != null && instance.EndTime < _timeKeeper.ExchangeTime)
361  {
362  continue;
363  }
364 
365  // prevent emitting past data, this can happen when switching symbols on daily data
366  if (_previous != null && _config.Resolution != Resolution.Tick)
367  {
368  if (_config.IsCustomData)
369  {
370  // Skip the point if time went backwards for custom data?
371  // TODO: Should this be the case for all datapoints?
372  if (instance.EndTime < _previous.EndTime) continue;
373  }
374  else
375  {
376  // all other resolutions don't allow duplicate end times
377  if (instance.EndTime <= _previous.EndTime) continue;
378  }
379  }
380 
381  if (instance.EndTime < _periodStart)
382  {
383  // keep reading until we get a value on or after the start
384  _previous = instance;
385  continue;
386  }
387 
388  // We have to perform this check after refreshing the enumerator, if appropriate
389  // 'instance' could be a data point far in the future due to remapping (GH issue 5232) in which case it will be dropped
390  if (instance.Time > _periodFinish)
391  {
392  // stop reading when we get a value after the end
393  _endOfStream = true;
394  return false;
395  }
396 
397  // we've made it past all of our filters, we're withing the requested start/end of the subscription,
398  // we've satisfied user and market hour filters, so this data is good to go as current
399  Current = instance;
400 
401  // we keep the last raw price registered before we return so we are not affected by anyone (price scale) modifying our current
402  _lastRawPrice = Current.Price;
403  return true;
404  }
405 
406  // we've ended the enumerator, time to refresh
407  UpdateDataEnumerator(true);
408  }
409  while (_subscriptionFactoryEnumerator != null);
410 
411  _endOfStream = true;
412  return false;
413  }
414 
415  /// <summary>
416  /// Emits a new tradable date event and tries to update the data enumerator if necessary
417  /// </summary>
418  private void HandleNewTradableDate(object sender, DateTime date)
419  {
420  OnNewTradableDate(new NewTradableDateEventArgs(date, _previous, _config.Symbol, _lastRawPrice));
421  UpdateDataEnumerator(false);
422  }
423 
424  /// <summary>
425  /// Resolves the next enumerator to be used in <see cref="MoveNext"/> and updates
426  /// <see cref="_subscriptionFactoryEnumerator"/>
427  /// </summary>
428  /// <returns>True, if the enumerator has been updated (even if updated to null)</returns>
429  private bool UpdateDataEnumerator(bool endOfEnumerator)
430  {
431  // Guard for infinite recursion: during an enumerator update, we might ask for a new date,
432  // which might end up with a new exchange date being detected and another update being requested.
433  // Just skip that update and let's do it ourselves after the date is resolved
434  if (_updatingDataEnumerator)
435  {
436  return false;
437  }
438 
439  _updatingDataEnumerator = true;
440  try
441  {
442  do
443  {
444  var date = _timeKeeper.DataTime.Date;
445 
446  // Update current date only if the enumerator has ended, else we might just need to change files
447  // (e.g. same date, but symbol was mapped)
448  if (endOfEnumerator && !TryGetNextDate(out date))
449  {
450  _subscriptionFactoryEnumerator = null;
451  // if we run out of dates then we're finished with this subscription
452  return true;
453  }
454 
455  // fetch the new source, using the data time zone for the date
456  var newSource = _dataFactory.GetSource(_config, date, false);
457  if (newSource == null)
458  {
459  // move to the next day
460  continue;
461  }
462 
463  // check if we should create a new subscription factory
464  var sourceChanged = _source != newSource && !string.IsNullOrEmpty(newSource.Source);
465  if (sourceChanged)
466  {
467  // dispose of the current enumerator before creating a new one
468  _subscriptionFactoryEnumerator.DisposeSafely();
469 
470  // save off for comparison next time
471  _source = newSource;
472  var subscriptionFactory = CreateSubscriptionFactory(newSource, _dataFactory, _dataProvider);
473  _subscriptionFactoryEnumerator = SortEnumerator<DateTime>.TryWrapSortEnumerator(newSource.Sort, subscriptionFactory.Read(newSource));
474  return true;
475  }
476 
477  // if there's still more in the enumerator and we received the same source from the GetSource call
478  // above, then just keep using the same enumerator as we were before
479  if (!endOfEnumerator) // && !sourceChanged is always true here
480  {
481  return false;
482  }
483 
484  // keep churning until we find a new source or run out of tradeable dates
485  // in live mode tradeable dates won't advance beyond today's date, but
486  // TryGetNextDate will return false if it's already at today
487  }
488  while (true);
489  }
490  finally
491  {
492  _updatingDataEnumerator = false;
493  }
494  }
495 
496  private ISubscriptionDataSourceReader CreateSubscriptionFactory(SubscriptionDataSource source, BaseData baseDataInstance, IDataProvider dataProvider)
497  {
498  var factory = SubscriptionDataSourceReader.ForSource(source, _dataCacheProvider, _config, _timeKeeper.DataTime.Date, false, baseDataInstance, dataProvider, _objectStore);
499  AttachEventHandlers(factory, source);
500  return factory;
501  }
502 
503  private void AttachEventHandlers(ISubscriptionDataSourceReader dataSourceReader, SubscriptionDataSource source)
504  {
505  dataSourceReader.InvalidSource += (sender, args) =>
506  {
507  if (_config.IsCustomData && !_config.Type.GetBaseDataInstance().IsSparseData())
508  {
510  new DownloadFailedEventArgs(_config.Symbol,
511  "We could not fetch the requested data. " +
512  "This may not be valid data, or a failed download of custom data. " +
513  $"Skipping source ({args.Source.Source})."));
514  return;
515  }
516 
517  switch (args.Source.TransportMedium)
518  {
519  case SubscriptionTransportMedium.LocalFile:
520  // the local uri doesn't exist, write an error and return null so we we don't try to get data for today
521  // Log.Trace(string.Format("SubscriptionDataReader.GetReader(): Could not find QC Data, skipped: {0}", source));
522  break;
523 
524  case SubscriptionTransportMedium.RemoteFile:
526  new DownloadFailedEventArgs(_config.Symbol,
527  $"Error downloading custom data source file, skipped: {source} " +
528  $"Error: {args.Exception.Message}", args.Exception.StackTrace));
529  break;
530 
531  case SubscriptionTransportMedium.Rest:
532  break;
533 
534  case SubscriptionTransportMedium.ObjectStore:
535  break;
536 
537  default:
538  throw new ArgumentOutOfRangeException();
539  }
540  };
541 
542  if (dataSourceReader is TextSubscriptionDataSourceReader)
543  {
544  // handle empty files/instantiation errors
545  var textSubscriptionFactory = (TextSubscriptionDataSourceReader)dataSourceReader;
546  // handle parser errors
547  textSubscriptionFactory.ReaderError += (sender, args) =>
548  {
550  new ReaderErrorDetectedEventArgs(_config.Symbol,
551  $"Error invoking {_config.Symbol} data reader. " +
552  $"Line: {args.Line} Error: {args.Exception.Message}",
553  args.Exception.StackTrace));
554  };
555  }
556  }
557 
558  /// <summary>
559  /// Iterates the tradeable dates enumerator
560  /// </summary>
561  /// <param name="date">The next tradeable date</param>
562  /// <returns>True if we got a new date from the enumerator, false if it's exhausted, or in live mode if we're already at today</returns>
563  private bool TryGetNextDate(out DateTime date)
564  {
565  while (_timeKeeper.TryAdvanceUntilNextDataDate())
566  {
567  date = _timeKeeper.DataTime.Date;
568 
569  if (_pastDelistedDate || date > _delistingDate)
570  {
571  // if we already passed our delisting date we stop
572  _pastDelistedDate = true;
573  break;
574  }
575 
576  if (!_mapFile.HasData(date))
577  {
578  continue;
579  }
580 
581  // don't do other checks if we haven't gotten data for this date yet
582  if (_previous != null && _previous.EndTime.ConvertTo(_config.ExchangeTimeZone, _config.DataTimeZone) > date)
583  {
584  continue;
585  }
586 
587  // we've passed initial checks,now go get data for this date!
588  return true;
589  }
590 
591  // no more tradeable dates, we've exhausted the enumerator
592  date = DateTime.MaxValue.Date;
593  return false;
594  }
595 
596  /// <summary>
597  /// Reset the IEnumeration
598  /// </summary>
599  /// <remarks>Not used</remarks>
600  public void Reset()
601  {
602  throw new NotImplementedException("Reset method not implemented. Assumes loop will only be used once.");
603  }
604 
605  /// <summary>
606  /// Dispose of the Stream Reader and close out the source stream and file connections.
607  /// </summary>
608  public void Dispose()
609  {
610  _subscriptionFactoryEnumerator.DisposeSafely();
611 
612  if (_initialized)
613  {
614  _timeKeeper.NewExchangeDate -= HandleNewTradableDate;
615  _timeKeeper.DisposeSafely();
616  }
617  }
618 
619  /// <summary>
620  /// Event invocator for the <see cref="InvalidConfigurationDetected"/> event
621  /// </summary>
622  /// <param name="e">Event arguments for the <see cref="InvalidConfigurationDetected"/> event</param>
624  {
625  InvalidConfigurationDetected?.Invoke(this, e);
626  }
627 
628  /// <summary>
629  /// Event invocator for the <see cref="NumericalPrecisionLimited"/> event
630  /// </summary>
631  /// <param name="e">Event arguments for the <see cref="NumericalPrecisionLimited"/> event</param>
633  {
634  NumericalPrecisionLimited?.Invoke(this, e);
635  }
636 
637  /// <summary>
638  /// Event invocator for the <see cref="StartDateLimited"/> event
639  /// </summary>
640  /// <param name="e">Event arguments for the <see cref="StartDateLimited"/> event</param>
642  {
643  StartDateLimited?.Invoke(this, e);
644  }
645 
646  /// <summary>
647  /// Event invocator for the <see cref="DownloadFailed"/> event
648  /// </summary>
649  /// <param name="e">Event arguments for the <see cref="DownloadFailed"/> event</param>
650  protected virtual void OnDownloadFailed(DownloadFailedEventArgs e)
651  {
652  DownloadFailed?.Invoke(this, e);
653  }
654 
655  /// <summary>
656  /// Event invocator for the <see cref="ReaderErrorDetected"/> event
657  /// </summary>
658  /// <param name="e">Event arguments for the <see cref="ReaderErrorDetected"/> event</param>
660  {
661  ReaderErrorDetected?.Invoke(this, e);
662  }
663 
664  /// <summary>
665  /// Event invocator for the <see cref="NewTradableDate"/> event
666  /// </summary>
667  /// <param name="e">Event arguments for the <see cref="NewTradableDate"/> event</param>
668  protected virtual void OnNewTradableDate(NewTradableDateEventArgs e)
669  {
670  NewTradableDate?.Invoke(this, e);
671  }
672  }
673 }