21 using System.Collections;
22 using System.Globalization;
25 using System.Collections.Generic;
43 private bool _initialized;
48 private bool _endOfStream;
50 private IEnumerator<BaseData> _subscriptionFactoryEnumerator;
56 private bool _hasScaleFactors;
64 private DateTime _periodStart;
65 private readonly DateTime _periodFinish;
72 private bool _pastDelistedDate;
75 private decimal? _lastRawPrice;
76 private DateChangeTimeKeeper _timeKeeper;
77 private readonly IEnumerable<DateTime> _tradableDatesInDataTimeZone;
82 private DateTime _delistingDate;
84 private bool _updatingDataEnumerator;
128 object IEnumerator.Current
156 _mapFileProvider = mapFileProvider;
157 _factorFileProvider = factorFileProvider;
158 _dataCacheProvider = dataCacheProvider;
160 _dataProvider = dataProvider;
161 _objectStore = objectStore;
182 _dataFactory = _config.GetBaseDataInstance();
184 catch (ArgumentException exception)
207 var mapFile = _mapFileProvider.ResolveMapFile(_config);
210 if (mapFile.Any()) _mapFile = mapFile;
212 if (_config.PricesShouldBeScaled())
214 var factorFile = _factorFileProvider.Get(_config.Symbol);
215 _hasScaleFactors = factorFile !=
null;
216 if (_hasScaleFactors)
218 _factorFile = factorFile;
229 $
"[{_config.Symbol.Value}, {_factorFile.FactorFileMinimumDate.Value.ToShortDateString()}]"));
234 if (_periodStart < mapFile.FirstDate)
236 _periodStart = mapFile.FirstDate;
240 $
"[{_config.Symbol.Value}," +
241 $
" {mapFile.FirstDate.ToString("yyyy-MM-dd
", CultureInfo.InvariantCulture)}]"));
245 catch (Exception err)
247 Log.
Error(err,
"Fetching Price/Map Factors: " + _config.Symbol.ID +
": ");
251 _factorFile ??= _config.Symbol.GetEmptyFactorFile();
254 _delistingDate = _config.Symbol.GetDelistingDate(_mapFile);
257 _delistingDate = _delistingDate.AddDays(1);
259 _timeKeeper =
new DateChangeTimeKeeper(_tradableDatesInDataTimeZone, _config, _exchangeHours, _delistingDate);
260 _timeKeeper.NewExchangeDate += HandleNewTradableDate;
262 UpdateDataEnumerator(
true);
294 if (_subscriptionFactoryEnumerator ==
null)
302 if (_pastDelistedDate)
308 while (_subscriptionFactoryEnumerator.MoveNext())
310 var instance = _subscriptionFactoryEnumerator.Current;
311 if (instance ==
null)
319 var previousMappedSymbol = _config.MappedSymbol;
323 var currentSource = _source;
324 var nextExchangeDate = _config.Resolution ==
Resolution.Daily
325 && _timeKeeper.IsExchangeBehindData()
334 while (_timeKeeper.ExchangeTime < nextExchangeDate && currentSource == _source)
336 _timeKeeper.AdvanceTowardsExchangeTime(nextExchangeDate);
340 if (currentSource != _source
345 (_config.MappedSymbol != previousMappedSymbol && _config.Resolution !=
Resolution.Daily)
348 || (_config.Resolution ==
Resolution.Daily && _timeKeeper.IsExchangeBehindData())
353 || instance.EndTime.ConvertTo(_config.ExchangeTimeZone, _config.DataTimeZone).Date >= _timeKeeper.DataTime.Date
360 if (
Current !=
null && instance.
EndTime < _timeKeeper.ExchangeTime)
366 if (_previous !=
null && _config.Resolution !=
Resolution.Tick)
368 if (_config.IsCustomData)
372 if (instance.EndTime < _previous.
EndTime)
continue;
377 if (instance.EndTime <= _previous.
EndTime)
continue;
381 if (instance.EndTime < _periodStart)
384 _previous = instance;
390 if (instance.Time > _periodFinish)
407 UpdateDataEnumerator(
true);
409 while (_subscriptionFactoryEnumerator !=
null);
418 private void HandleNewTradableDate(
object sender, DateTime date)
421 UpdateDataEnumerator(
false);
429 private bool UpdateDataEnumerator(
bool endOfEnumerator)
434 if (_updatingDataEnumerator)
439 _updatingDataEnumerator =
true;
444 var date = _timeKeeper.DataTime.Date;
448 if (endOfEnumerator && !TryGetNextDate(out date))
450 _subscriptionFactoryEnumerator =
null;
456 var newSource = _dataFactory.
GetSource(_config, date,
false);
457 if (newSource ==
null)
464 var sourceChanged = _source != newSource && !
string.IsNullOrEmpty(newSource.Source);
468 _subscriptionFactoryEnumerator.DisposeSafely();
472 var subscriptionFactory = CreateSubscriptionFactory(newSource, _dataFactory, _dataProvider);
479 if (!endOfEnumerator)
492 _updatingDataEnumerator =
false;
498 var factory = SubscriptionDataSourceReader.ForSource(source, _dataCacheProvider, _config, _timeKeeper.DataTime.Date,
false, baseDataInstance, dataProvider, _objectStore);
499 AttachEventHandlers(factory, source);
503 private void AttachEventHandlers(ISubscriptionDataSourceReader dataSourceReader,
SubscriptionDataSource source)
505 dataSourceReader.InvalidSource += (sender, args) =>
507 if (_config.IsCustomData && !_config.Type.GetBaseDataInstance().IsSparseData())
510 new DownloadFailedEventArgs(_config.Symbol,
511 "We could not fetch the requested data. " +
512 "This may not be valid data, or a failed download of custom data. " +
513 $
"Skipping source ({args.Source.Source})."));
517 switch (args.Source.TransportMedium)
526 new DownloadFailedEventArgs(_config.Symbol,
527 $
"Error downloading custom data source file, skipped: {source} " +
528 $
"Error: {args.Exception.Message}", args.Exception.StackTrace));
538 throw new ArgumentOutOfRangeException();
542 if (dataSourceReader is TextSubscriptionDataSourceReader)
545 var textSubscriptionFactory = (TextSubscriptionDataSourceReader)dataSourceReader;
547 textSubscriptionFactory.ReaderError += (sender, args) =>
550 new ReaderErrorDetectedEventArgs(_config.Symbol,
551 $
"Error invoking {_config.Symbol} data reader. " +
552 $
"Line: {args.Line} Error: {args.Exception.Message}",
553 args.Exception.StackTrace));
563 private bool TryGetNextDate(out DateTime date)
565 while (_timeKeeper.TryAdvanceUntilNextDataDate())
567 date = _timeKeeper.DataTime.Date;
569 if (_pastDelistedDate || date > _delistingDate)
572 _pastDelistedDate =
true;
582 if (_previous !=
null && _previous.
EndTime.ConvertTo(_config.ExchangeTimeZone, _config.DataTimeZone) > date)
592 date = DateTime.MaxValue.Date;
602 throw new NotImplementedException(
"Reset method not implemented. Assumes loop will only be used once.");
610 _subscriptionFactoryEnumerator.DisposeSafely();
614 _timeKeeper.NewExchangeDate -= HandleNewTradableDate;
615 _timeKeeper.DisposeSafely();