21 using System.Collections;
22 using System.Globalization;
25 using System.Collections.Generic;
43 private bool _initialized;
48 private bool _endOfStream;
50 private IEnumerator<BaseData> _subscriptionFactoryEnumerator;
56 private bool _hasScaleFactors;
64 private DateTime _periodStart;
65 private readonly DateTime _periodFinish;
72 private bool _pastDelistedDate;
75 private decimal? _lastRawPrice;
76 private DateChangeTimeKeeper _timeKeeper;
77 private readonly IEnumerable<DateTime> _tradableDatesInDataTimeZone;
82 private DateTime _delistingDate;
84 private bool _updatingDataEnumerator;
128 object IEnumerator.Current
156 _mapFileProvider = mapFileProvider;
157 _factorFileProvider = factorFileProvider;
158 _dataCacheProvider = dataCacheProvider;
160 _dataProvider = dataProvider;
161 _objectStore = objectStore;
182 _dataFactory = _config.GetBaseDataInstance();
184 catch (ArgumentException exception)
207 var mapFile = _mapFileProvider.ResolveMapFile(_config);
210 if (mapFile.Any()) _mapFile = mapFile;
212 if (_config.PricesShouldBeScaled())
214 var factorFile = _factorFileProvider.Get(_config.Symbol);
215 _hasScaleFactors = factorFile !=
null;
216 if (_hasScaleFactors)
218 _factorFile = factorFile;
229 $
"[{_config.Symbol.Value}, {_factorFile.FactorFileMinimumDate.Value.ToShortDateString()}]"));
234 if (_periodStart < mapFile.FirstDate)
236 _periodStart = mapFile.FirstDate;
240 $
"[{_config.Symbol.Value}," +
241 $
" {mapFile.FirstDate.ToString("yyyy-MM-dd
", CultureInfo.InvariantCulture)}]"));
245 catch (Exception err)
247 Log.
Error(err,
"Fetching Price/Map Factors: " + _config.Symbol.ID +
": ");
251 _factorFile ??= _config.Symbol.GetEmptyFactorFile();
254 _delistingDate = _config.Symbol.GetDelistingDate(_mapFile);
256 _timeKeeper =
new DateChangeTimeKeeper(_tradableDatesInDataTimeZone, _config, _exchangeHours, _delistingDate);
257 _timeKeeper.NewExchangeDate += HandleNewTradableDate;
259 UpdateDataEnumerator(
true);
291 if (_subscriptionFactoryEnumerator ==
null)
299 if (_pastDelistedDate)
305 while (_subscriptionFactoryEnumerator.MoveNext())
307 var instance = _subscriptionFactoryEnumerator.Current;
308 if (instance ==
null)
316 var previousMappedSymbol = _config.MappedSymbol;
320 var currentSource = _source;
321 var nextExchangeDate = _config.Resolution ==
Resolution.Daily
322 && _timeKeeper.IsExchangeBehindData()
331 while (_timeKeeper.ExchangeTime < nextExchangeDate && currentSource == _source)
333 _timeKeeper.AdvanceTowardsExchangeTime(nextExchangeDate);
337 if (currentSource != _source
342 (_config.MappedSymbol != previousMappedSymbol && _config.Resolution !=
Resolution.Daily)
345 || (_config.Resolution ==
Resolution.Daily && _timeKeeper.IsExchangeBehindData())
350 || instance.EndTime.ConvertTo(_config.ExchangeTimeZone, _config.DataTimeZone).Date >= _timeKeeper.DataTime.Date
357 if (
Current !=
null && instance.
EndTime < _timeKeeper.ExchangeTime)
363 if (_previous !=
null && _config.Resolution !=
Resolution.Tick)
365 if (_config.IsCustomData)
369 if (instance.EndTime < _previous.
EndTime)
continue;
374 if (instance.EndTime <= _previous.
EndTime)
continue;
378 if (instance.EndTime < _periodStart)
381 _previous = instance;
387 if (instance.Time > _periodFinish)
404 UpdateDataEnumerator(
true);
406 while (_subscriptionFactoryEnumerator !=
null);
415 private void HandleNewTradableDate(
object sender, DateTime date)
418 UpdateDataEnumerator(
false);
426 private bool UpdateDataEnumerator(
bool endOfEnumerator)
431 if (_updatingDataEnumerator)
436 _updatingDataEnumerator =
true;
441 var date = _timeKeeper.DataTime.Date;
445 if (endOfEnumerator && !TryGetNextDate(out date))
447 _subscriptionFactoryEnumerator =
null;
453 var newSource = _dataFactory.
GetSource(_config, date,
false);
454 if (newSource ==
null)
461 var sourceChanged = _source != newSource && !
string.IsNullOrEmpty(newSource.Source);
465 _subscriptionFactoryEnumerator.DisposeSafely();
469 var subscriptionFactory = CreateSubscriptionFactory(newSource, _dataFactory, _dataProvider);
476 if (!endOfEnumerator)
489 _updatingDataEnumerator =
false;
495 var factory = SubscriptionDataSourceReader.ForSource(source, _dataCacheProvider, _config, _timeKeeper.DataTime.Date,
false, baseDataInstance, dataProvider, _objectStore);
496 AttachEventHandlers(factory, source);
500 private void AttachEventHandlers(ISubscriptionDataSourceReader dataSourceReader,
SubscriptionDataSource source)
502 dataSourceReader.InvalidSource += (sender, args) =>
504 if (_config.IsCustomData && !_config.Type.GetBaseDataInstance().IsSparseData())
507 new DownloadFailedEventArgs(_config.Symbol,
508 "We could not fetch the requested data. " +
509 "This may not be valid data, or a failed download of custom data. " +
510 $
"Skipping source ({args.Source.Source})."));
514 switch (args.Source.TransportMedium)
523 new DownloadFailedEventArgs(_config.Symbol,
524 $
"Error downloading custom data source file, skipped: {source} " +
525 $
"Error: {args.Exception.Message}", args.Exception.StackTrace));
535 throw new ArgumentOutOfRangeException();
539 if (dataSourceReader is TextSubscriptionDataSourceReader)
542 var textSubscriptionFactory = (TextSubscriptionDataSourceReader)dataSourceReader;
544 textSubscriptionFactory.ReaderError += (sender, args) =>
547 new ReaderErrorDetectedEventArgs(_config.Symbol,
548 $
"Error invoking {_config.Symbol} data reader. " +
549 $
"Line: {args.Line} Error: {args.Exception.Message}",
550 args.Exception.StackTrace));
560 private bool TryGetNextDate(out DateTime date)
562 while (_timeKeeper.TryAdvanceUntilNextDataDate())
564 date = _timeKeeper.DataTime.Date;
572 if (_previous !=
null && _previous.
EndTime.ConvertTo(_config.ExchangeTimeZone, _config.DataTimeZone) > date)
581 if (_timeKeeper.ExchangeTime.Date > _delistingDate)
583 _pastDelistedDate =
true;
587 date = DateTime.MaxValue.Date;
597 throw new NotImplementedException(
"Reset method not implemented. Assumes loop will only be used once.");
605 _subscriptionFactoryEnumerator.DisposeSafely();
609 _timeKeeper.NewExchangeDate -= HandleNewTradableDate;
610 _timeKeeper.DisposeSafely();