Lean  $LEAN_TAG$
LiveTradingDataFeed.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Linq;
19 using QuantConnect.Data;
20 using QuantConnect.Util;
21 using QuantConnect.Logging;
22 using QuantConnect.Packets;
25 using System.Collections.Generic;
34 
36 {
37  /// <summary>
38  /// Provides an implementation of <see cref="IDataFeed"/> that is designed to deal with
39  /// live, remote data sources
40  /// </summary>
42  {
43  private static readonly int MaximumWarmupHistoryDaysLookBack = Config.GetInt("maximum-warmup-history-days-look-back", 5);
44 
45  private LiveNodePacket _job;
46 
47  // used to get current time
48  private ITimeProvider _timeProvider;
49  private IAlgorithm _algorithm;
50  private ITimeProvider _frontierTimeProvider;
51  private IDataProvider _dataProvider;
52  private IMapFileProvider _mapFileProvider;
53  private IDataQueueHandler _dataQueueHandler;
54  private BaseDataExchange _customExchange;
55  private SubscriptionCollection _subscriptions;
56  private IFactorFileProvider _factorFileProvider;
57  private IDataChannelProvider _channelProvider;
58  // in live trading we delay scheduled universe selection between 11 & 12 hours after midnight UTC so that we allow new selection data to be piped in
59  // NY goes from -4/-5 UTC time, so:
60  // 11 UTC - 4 => 7am NY
61  // 12 UTC - 4 => 8am NY
62  private readonly TimeSpan _scheduledUniverseUtcTimeShift = TimeSpan.FromMinutes(11 * 60 + DateTime.UtcNow.Second);
63  private readonly HashSet<string> _unsupportedConfigurations = new();
64 
65  /// <summary>
66  /// Public flag indicator that the thread is still busy.
67  /// </summary>
68  public bool IsActive
69  {
70  get; private set;
71  }
72 
73  /// <summary>
74  /// Initializes the data feed for the specified job and algorithm
75  /// </summary>
76  public override void Initialize(IAlgorithm algorithm,
78  IResultHandler resultHandler,
79  IMapFileProvider mapFileProvider,
80  IFactorFileProvider factorFileProvider,
81  IDataProvider dataProvider,
82  IDataFeedSubscriptionManager subscriptionManager,
83  IDataFeedTimeProvider dataFeedTimeProvider,
84  IDataChannelProvider dataChannelProvider)
85  {
86  if (!(job is LiveNodePacket))
87  {
88  throw new ArgumentException("The LiveTradingDataFeed requires a LiveNodePacket.");
89  }
90 
91  _algorithm = algorithm;
92  _job = (LiveNodePacket)job;
93  _timeProvider = dataFeedTimeProvider.TimeProvider;
94  _dataProvider = dataProvider;
95  _mapFileProvider = mapFileProvider;
96  _factorFileProvider = factorFileProvider;
97  _channelProvider = dataChannelProvider;
98  _frontierTimeProvider = dataFeedTimeProvider.FrontierTimeProvider;
99  _customExchange = GetBaseDataExchange();
100  _subscriptions = subscriptionManager.DataFeedSubscriptions;
101 
102  _dataQueueHandler = GetDataQueueHandler();
103  _dataQueueHandler?.SetJob(_job);
104 
105  // run the custom data exchange
106  _customExchange.Start();
107 
108  IsActive = true;
109 
110  base.Initialize(algorithm, job, resultHandler, mapFileProvider, factorFileProvider, dataProvider, subscriptionManager, dataFeedTimeProvider, dataChannelProvider);
111  }
112 
113  /// <summary>
114  /// Creates a new subscription to provide data for the specified security.
115  /// </summary>
116  /// <param name="request">Defines the subscription to be added, including start/end times the universe and security</param>
117  /// <returns>The created <see cref="Subscription"/> if successful, null otherwise</returns>
119  {
120  Subscription subscription = null;
121  try
122  {
123  // create and add the subscription to our collection
124  subscription = request.IsUniverseSubscription
125  ? CreateUniverseSubscription(request)
126  : CreateDataSubscription(request);
127  }
128  catch (Exception err)
129  {
130  Log.Error(err, $"CreateSubscription(): Failed configuration: '{request.Configuration}'");
131  // kill the algorithm, this shouldn't happen
132  _algorithm.SetRuntimeError(err, $"Failed to subscribe to {request.Configuration.Symbol}");
133  }
134 
135  return subscription;
136  }
137 
138  /// <summary>
139  /// Removes the subscription from the data feed, if it exists
140  /// </summary>
141  /// <param name="subscription">The subscription to remove</param>
142  public override void RemoveSubscription(Subscription subscription)
143  {
144  var symbol = subscription.Configuration.Symbol;
145 
146  // remove the subscriptions
147  if (!_channelProvider.ShouldStreamSubscription(subscription.Configuration))
148  {
149  _customExchange.RemoveEnumerator(symbol);
150  }
151  else
152  {
153  _dataQueueHandler.UnsubscribeWithMapping(subscription.Configuration);
154  }
155  }
156 
157  /// <summary>
158  /// External controller calls to signal a terminate of the thread.
159  /// </summary>
160  public override void Exit()
161  {
162  if (IsActive)
163  {
164  IsActive = false;
165  Log.Trace("LiveTradingDataFeed.Exit(): Start. Setting cancellation token...");
166  if (_dataQueueHandler is DataQueueHandlerManager manager)
167  {
168  manager.UnsupportedConfiguration -= HandleUnsupportedConfigurationEvent;
169  }
170  _customExchange?.Stop();
171  Log.Trace("LiveTradingDataFeed.Exit(): Exit Finished.");
172 
173  base.Exit();
174  }
175  }
176 
177  /// <summary>
178  /// Gets the <see cref="IDataQueueHandler"/> to use by default <see cref="DataQueueHandlerManager"/>
179  /// </summary>
180  /// <remarks>Useful for testing</remarks>
181  /// <returns>The loaded <see cref="IDataQueueHandler"/></returns>
183  {
184  var result = new DataQueueHandlerManager(_algorithm.Settings);
185  result.UnsupportedConfiguration += HandleUnsupportedConfigurationEvent;
186  return result;
187  }
188 
189  /// <summary>
190  /// Gets the <see cref="BaseDataExchange"/> to use
191  /// </summary>
192  /// <remarks>Useful for testing</remarks>
194  {
195  return new BaseDataExchange("CustomDataExchange") { SleepInterval = 100 };
196  }
197 
198  /// <summary>
199  /// Creates a new subscription for the specified security
200  /// </summary>
201  /// <param name="request">The subscription request</param>
202  /// <returns>A new subscription instance of the specified security</returns>
203  private Subscription CreateDataSubscription(SubscriptionRequest request)
204  {
205  Subscription subscription = null;
206 
207  var localEndTime = request.EndTimeUtc.ConvertFromUtc(request.Security.Exchange.TimeZone);
208  var timeZoneOffsetProvider = new TimeZoneOffsetProvider(request.Configuration.ExchangeTimeZone, request.StartTimeUtc, request.EndTimeUtc);
209 
210  IEnumerator<BaseData> enumerator = null;
211  if (!_channelProvider.ShouldStreamSubscription(request.Configuration))
212  {
213  if (!Tiingo.IsAuthCodeSet)
214  {
215  // we're not using the SubscriptionDataReader, so be sure to set the auth token here
216  Tiingo.SetAuthCode(Config.Get("tiingo-auth-token"));
217  }
218 
219  var factory = new LiveCustomDataSubscriptionEnumeratorFactory(_timeProvider, _algorithm.ObjectStore);
220  var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
221 
222  var enqueable = new EnqueueableEnumerator<BaseData>();
223  _customExchange.AddEnumerator(request.Configuration.Symbol, enumeratorStack, handleData: data =>
224  {
225  enqueable.Enqueue(data);
226 
227  subscription?.OnNewDataAvailable();
228  });
229 
230  enumerator = enqueable;
231  }
232  else
233  {
234  var auxEnumerators = new List<IEnumerator<BaseData>>();
235 
236  if (LiveAuxiliaryDataEnumerator.TryCreate(request.Configuration, _timeProvider, request.Security.Cache, _mapFileProvider,
237  _factorFileProvider, request.StartTimeLocal, out var auxDataEnumator))
238  {
239  auxEnumerators.Add(auxDataEnumator);
240  }
241 
242  EventHandler handler = (_, _) => subscription?.OnNewDataAvailable();
243  enumerator = Subscribe(request.Configuration, handler, IsExpired);
244 
245  if (auxEnumerators.Count > 0)
246  {
247  enumerator = new LiveAuxiliaryDataSynchronizingEnumerator(_timeProvider, request.Configuration.ExchangeTimeZone, enumerator, auxEnumerators);
248  }
249  }
250 
251  // scale prices before 'SubscriptionFilterEnumerator' since it updates securities realtime price
252  // and before fill forwarding so we don't happen to apply twice the factor
253  if (request.Configuration.PricesShouldBeScaled(liveMode: true))
254  {
255  enumerator = new PriceScaleFactorEnumerator(
256  enumerator,
257  request.Configuration,
258  _factorFileProvider,
259  liveMode: true);
260  }
261 
262  if (request.Configuration.FillDataForward)
263  {
264  var fillForwardResolution = _subscriptions.UpdateAndGetFillForwardResolution(request.Configuration);
265  // Pass the security exchange hours explicitly to avoid using the ones in the request, since
266  // those could be different. e.g. when requests are created for open interest data the exchange
267  // hours are set to always open to avoid OI data being filtered out due to the exchange being closed.
268  var useDailyStrictEndTimes = LeanData.UseDailyStrictEndTimes(_algorithm.Settings, request, request.Configuration.Symbol, request.Configuration.Increment, request.Security.Exchange.Hours);
269 
270  enumerator = new LiveFillForwardEnumerator(_frontierTimeProvider, enumerator, request.Security.Exchange, fillForwardResolution,
272  useDailyStrictEndTimes, request.Configuration.Type);
273  }
274 
275  // make our subscriptions aware of the frontier of the data feed, prevents future data from spewing into the feed
276  enumerator = new FrontierAwareEnumerator(enumerator, _frontierTimeProvider, timeZoneOffsetProvider);
277 
278  // define market hours and user filters to incoming data after the frontier enumerator so during warmup we avoid any realtime data making it's way into the securities
280  {
281  enumerator = new SubscriptionFilterEnumerator(enumerator, request.Security, localEndTime, request.Configuration.ExtendedMarketHours, true, request.ExchangeHours);
282  }
283 
284  enumerator = GetWarmupEnumerator(request, enumerator);
285 
286  var subscriptionDataEnumerator = new SubscriptionDataEnumerator(request.Configuration, request.Security.Exchange.Hours, timeZoneOffsetProvider,
287  enumerator, request.IsUniverseSubscription, _algorithm.Settings.DailyPreciseEndTime);
288  subscription = new Subscription(request, subscriptionDataEnumerator, timeZoneOffsetProvider);
289 
290  return subscription;
291  }
292 
293  /// <summary>
294  /// Helper method to determine if the symbol associated with the requested configuration is expired or not
295  /// </summary>
296  /// <remarks>This is useful during warmup where we can be requested to add some already expired asset. We want to skip sending it
297  /// to our live <see cref="_dataQueueHandler"/> instance to avoid explosions. But we do want to add warmup enumerators</remarks>
298  private bool IsExpired(SubscriptionDataConfig dataConfig)
299  {
300  var mapFile = _mapFileProvider.ResolveMapFile(dataConfig);
301  var delistingDate = dataConfig.Symbol.GetDelistingDate(mapFile);
302  return _timeProvider.GetUtcNow().Date > delistingDate.ConvertToUtc(dataConfig.ExchangeTimeZone);
303  }
304 
305  private IEnumerator<BaseData> Subscribe(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler, Func<SubscriptionDataConfig, bool> isExpired)
306  {
307  return new LiveSubscriptionEnumerator(dataConfig, _dataQueueHandler, newDataAvailableHandler, isExpired);
308  }
309 
310  /// <summary>
311  /// Creates a new subscription for universe selection
312  /// </summary>
313  /// <param name="request">The subscription request</param>
314  private Subscription CreateUniverseSubscription(SubscriptionRequest request)
315  {
316  Subscription subscription = null;
317 
318  // TODO : Consider moving the creating of universe subscriptions to a separate, testable class
319 
320  // grab the relevant exchange hours
321  var config = request.Universe.Configuration;
322  var localEndTime = request.EndTimeUtc.ConvertFromUtc(request.Security.Exchange.TimeZone);
323  var tzOffsetProvider = new TimeZoneOffsetProvider(request.Configuration.ExchangeTimeZone, request.StartTimeUtc, request.EndTimeUtc);
324 
325  IEnumerator<BaseData> enumerator = null;
326 
327  var timeTriggered = request.Universe as ITimeTriggeredUniverse;
328  if (timeTriggered != null)
329  {
330  Log.Trace($"LiveTradingDataFeed.CreateUniverseSubscription(): Creating user defined universe: {config.Symbol.ID}");
331 
332  // spoof a tick on the requested interval to trigger the universe selection function
333  var enumeratorFactory = new TimeTriggeredUniverseSubscriptionEnumeratorFactory(timeTriggered, MarketHoursDatabase.FromDataFolder(), _frontierTimeProvider);
334  enumerator = enumeratorFactory.CreateEnumerator(request, _dataProvider);
335 
336  enumerator = new FrontierAwareEnumerator(enumerator, _timeProvider, tzOffsetProvider);
337 
338  var enqueueable = new EnqueueableEnumerator<BaseData>();
339  _customExchange.AddEnumerator(new EnumeratorHandler(config.Symbol, enumerator, enqueueable));
340  enumerator = enqueueable;
341  }
342  else if (config.Type.IsAssignableTo(typeof(ETFConstituentUniverse)) ||
343  config.Type.IsAssignableTo(typeof(FundamentalUniverse)) ||
344  (request.Universe is OptionChainUniverse && request.Configuration.SecurityType != SecurityType.FutureOption))
345  {
346  Log.Trace($"LiveTradingDataFeed.CreateUniverseSubscription(): Creating {config.Type.Name} universe: {config.Symbol.ID}");
347 
348  // Will try to pull data from the data folder every 10min, file with yesterdays date.
349  // If lean is started today it will trigger initial coarse universe selection
350  var factory = new LiveCustomDataSubscriptionEnumeratorFactory(_timeProvider,
351  _algorithm.ObjectStore,
352  // we adjust time to the previous tradable date
353  time => Time.GetStartTimeForTradeBars(request.Security.Exchange.Hours, time, Time.OneDay, 1, false, config.DataTimeZone, _algorithm.Settings.DailyPreciseEndTime),
354  TimeSpan.FromMinutes(10)
355  );
356  var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
357 
358  // aggregates each coarse data point into a single BaseDataCollection
359  var aggregator = new BaseDataCollectionAggregatorEnumerator(enumeratorStack, config.Symbol, true);
360  var enqueable = new EnqueueableEnumerator<BaseData>();
361  _customExchange.AddEnumerator(config.Symbol, aggregator, handleData: data =>
362  {
363  enqueable.Enqueue(data);
364  subscription?.OnNewDataAvailable();
365  });
366 
367  enumerator = GetConfiguredFrontierAwareEnumerator(enqueable, tzOffsetProvider,
368  // advance time if before 23pm or after 5am and not on Saturdays
369  time => time.Hour < 23 && time.Hour > 5 && time.DayOfWeek != DayOfWeek.Saturday);
370  }
371  else if (request.Universe is OptionChainUniverse)
372  {
373  Log.Trace("LiveTradingDataFeed.CreateUniverseSubscription(): Creating option chain universe: " + config.Symbol.ID);
374 
375  Func<SubscriptionRequest, IEnumerator<BaseData>> configure = (subRequest) =>
376  {
377  // Pass the security exchange hours explicitly to avoid using the ones in the request, since
378  // those could be different. e.g. when requests are created for open interest data the exchange
379  // hours are set to always open to avoid OI data being filtered out due to the exchange being closed.
380  var useDailyStrictEndTimes = LeanData.UseDailyStrictEndTimes(_algorithm.Settings, request, request.Configuration.Symbol,
381  request.Configuration.Increment, request.Security.Exchange.Hours);
382  var fillForwardResolution = _subscriptions.UpdateAndGetFillForwardResolution(subRequest.Configuration);
383  var input = Subscribe(subRequest.Configuration, (sender, args) => subscription?.OnNewDataAvailable(), (_) => false);
384  return new LiveFillForwardEnumerator(_frontierTimeProvider, input, subRequest.Security.Exchange, fillForwardResolution,
385  subRequest.Configuration.ExtendedMarketHours, localEndTime, subRequest.Configuration.Resolution,
386  subRequest.Configuration.DataTimeZone, useDailyStrictEndTimes, request.Configuration.Type);
387  };
388 
389  var symbolUniverse = GetUniverseProvider(request.Configuration.SecurityType);
390 
391  var enumeratorFactory = new OptionChainUniverseSubscriptionEnumeratorFactory(configure, symbolUniverse, _timeProvider);
392  enumerator = enumeratorFactory.CreateEnumerator(request, _dataProvider);
393 
394  enumerator = new FrontierAwareEnumerator(enumerator, _frontierTimeProvider, tzOffsetProvider);
395  }
396  else if (request.Universe is FuturesChainUniverse)
397  {
398  Log.Trace("LiveTradingDataFeed.CreateUniverseSubscription(): Creating futures chain universe: " + config.Symbol.ID);
399 
400  var symbolUniverse = GetUniverseProvider(SecurityType.Future);
401 
402  enumerator = new DataQueueFuturesChainUniverseDataCollectionEnumerator(request, symbolUniverse, _timeProvider);
403  enumerator = new FrontierAwareEnumerator(enumerator, _frontierTimeProvider, tzOffsetProvider);
404  }
405  else
406  {
407  Log.Trace("LiveTradingDataFeed.CreateUniverseSubscription(): Creating custom universe: " + config.Symbol.ID);
408 
409  var factory = new LiveCustomDataSubscriptionEnumeratorFactory(_timeProvider, _algorithm.ObjectStore);
410  var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
411  enumerator = new BaseDataCollectionAggregatorEnumerator(enumeratorStack, config.Symbol, liveMode: true);
412 
413  var enqueueable = new EnqueueableEnumerator<BaseData>();
414  _customExchange.AddEnumerator(new EnumeratorHandler(config.Symbol, enumerator, enqueueable));
415  enumerator = enqueueable;
416  }
417 
418  enumerator = AddScheduleWrapper(request, enumerator, new PredicateTimeProvider(_frontierTimeProvider, (currentUtcDateTime) => {
419  // will only let time advance after it's passed the live time shift frontier
420  return currentUtcDateTime.TimeOfDay > _scheduledUniverseUtcTimeShift;
421  }));
422 
423  enumerator = GetWarmupEnumerator(request, enumerator);
424 
425  // create the subscription
426  var subscriptionDataEnumerator = new SubscriptionDataEnumerator(request.Configuration, request.Security.Exchange.Hours, tzOffsetProvider,
427  enumerator, request.IsUniverseSubscription, _algorithm.Settings.DailyPreciseEndTime);
428  subscription = new Subscription(request, subscriptionDataEnumerator, tzOffsetProvider);
429 
430  return subscription;
431  }
432 
433  /// <summary>
434  /// Build and apply the warmup enumerators when required
435  /// </summary>
436  private IEnumerator<BaseData> GetWarmupEnumerator(SubscriptionRequest request, IEnumerator<BaseData> liveEnumerator)
437  {
438  if (_algorithm.IsWarmingUp)
439  {
440  var warmupRequest = new SubscriptionRequest(request, endTimeUtc: _timeProvider.GetUtcNow(),
441  // we will not fill forward each warmup enumerators separately but concatenated bellow
442  configuration: new SubscriptionDataConfig(request.Configuration, fillForward: false,
443  resolution: _algorithm.Settings.WarmupResolution));
444  if (warmupRequest.TradableDaysInDataTimeZone.Any()
445  // make sure there is at least room for a single bar of the requested resolution, else can cause issues with some history providers
446  // this could happen when we create some internal subscription whose start time is 'Now', which we don't really want to warmup
447  && warmupRequest.EndTimeUtc - warmupRequest.StartTimeUtc >= warmupRequest.Configuration.Resolution.ToTimeSpan()
448  // since we change the resolution, let's validate it's still valid configuration (example daily equity quotes are not!)
449  && LeanData.IsValidConfiguration(warmupRequest.Configuration.SecurityType, warmupRequest.Configuration.Resolution, warmupRequest.Configuration.TickType))
450  {
451  // since we will source data locally and from the history provider, let's limit the history request size
452  // by setting a start date respecting the 'MaximumWarmupHistoryDaysLookBack'
453  var historyWarmup = warmupRequest;
454  var warmupHistoryStartDate = warmupRequest.EndTimeUtc.AddDays(-MaximumWarmupHistoryDaysLookBack);
455  if (warmupHistoryStartDate > warmupRequest.StartTimeUtc)
456  {
457  historyWarmup = new SubscriptionRequest(warmupRequest, startTimeUtc: warmupHistoryStartDate);
458  }
459 
460  // let's keep track of the last point we got from the file based enumerator and start our history enumeration from this point
461  // this is much more efficient since these duplicated points will be dropped by the filter righ away causing memory usage spikes
462  var lastPointTracker = new LastPointTracker();
463 
464  var synchronizedWarmupEnumerator = TryAddFillForwardEnumerator(warmupRequest,
465  // we concatenate the file based and history based warmup enumerators, dropping duplicate time stamps
466  new ConcatEnumerator(true, GetFileBasedWarmupEnumerator(warmupRequest, lastPointTracker), GetHistoryWarmupEnumerator(historyWarmup, lastPointTracker)) { CanEmitNull = false },
467  // if required by the original request, we will fill forward the Synced warmup data
469  _algorithm.Settings.WarmupResolution);
470  synchronizedWarmupEnumerator = AddScheduleWrapper(warmupRequest, synchronizedWarmupEnumerator, null);
471 
472  // don't let future data past. We let null pass because that's letting the next enumerator know we've ended because we always return true in live
473  synchronizedWarmupEnumerator = new FilterEnumerator<BaseData>(synchronizedWarmupEnumerator, data => data == null || data.EndTime <= warmupRequest.EndTimeLocal);
474 
475  // the order here is important, concat enumerator will keep the last enumerator given and dispose of the rest
476  liveEnumerator = new ConcatEnumerator(true, synchronizedWarmupEnumerator, liveEnumerator);
477  }
478  }
479  return liveEnumerator;
480  }
481 
482  /// <summary>
483  /// File based warmup enumerator
484  /// </summary>
485  private IEnumerator<BaseData> GetFileBasedWarmupEnumerator(SubscriptionRequest warmup, LastPointTracker lastPointTracker)
486  {
487  IEnumerator<BaseData> result = null;
488  try
489  {
490  result = new FilterEnumerator<BaseData>(CreateEnumerator(warmup),
491  data =>
492  {
493  // don't let future data past, nor fill forward, that will be handled after merging with the history request response
494  if (data == null || data.EndTime < warmup.EndTimeLocal && !data.IsFillForward)
495  {
496  if (data != null)
497  {
498  lastPointTracker.LastDataPoint = data;
499  }
500  return true;
501  }
502  return false;
503  });
504  }
505  catch (Exception e)
506  {
507  Log.Error(e, $"File based warmup: {warmup.Configuration}");
508  }
509  return result;
510  }
511 
512  /// <summary>
513  /// History based warmup enumerator
514  /// </summary>
515  private IEnumerator<BaseData> GetHistoryWarmupEnumerator(SubscriptionRequest warmup, LastPointTracker lastPointTracker)
516  {
517  IEnumerator<BaseData> result;
518  if (warmup.IsUniverseSubscription)
519  {
520  // we ignore the fill forward time span argument because we will fill forwared the concatenated file and history based enumerators next in the stack
521  result = CreateUniverseEnumerator(warmup, createUnderlyingEnumerator: (req, _) => GetHistoryWarmupEnumerator(req, lastPointTracker));
522  }
523  else
524  {
525  // we create an enumerable of which we get the enumerator to defer the creation of the history request until the file based enumeration ended
526  // and potentially the 'lastPointTracker' is available to adjust our start time
527  result = new[] { warmup }.SelectMany(_ =>
528  {
529  var startTimeUtc = warmup.StartTimeUtc;
530  if (lastPointTracker != null && lastPointTracker.LastDataPoint != null)
531  {
532  var lastPointExchangeTime = lastPointTracker.LastDataPoint.Time;
533  if (warmup.Configuration.Resolution == Resolution.Daily)
534  {
535  // time could be 9.30 for example using strict daily end times, but we just want the date in this case
536  lastPointExchangeTime = lastPointExchangeTime.Date;
537  }
538 
539  var utcLastPointTime = lastPointExchangeTime.ConvertToUtc(warmup.ExchangeHours.TimeZone);
540  if (utcLastPointTime > startTimeUtc)
541  {
542  if (Log.DebuggingEnabled)
543  {
544  Log.Debug($"LiveTradingDataFeed.GetHistoryWarmupEnumerator(): Adjusting history warmup start time to {utcLastPointTime} from {startTimeUtc} for {warmup.Configuration}");
545  }
546  startTimeUtc = utcLastPointTime;
547  }
548  }
549  var historyRequest = new Data.HistoryRequest(warmup.Configuration, warmup.ExchangeHours, startTimeUtc, warmup.EndTimeUtc);
550  try
551  {
552  return _algorithm.HistoryProvider.GetHistory(new[] { historyRequest }, _algorithm.TimeZone).Select(slice =>
553  {
554  try
555  {
556  var data = slice.Get(historyRequest.DataType);
557  return (BaseData)data[warmup.Configuration.Symbol];
558  }
559  catch (Exception e)
560  {
561  Log.Error(e, $"History warmup: {warmup.Configuration}");
562  }
563  return null;
564  });
565  }
566  catch
567  {
568  // some history providers could throw if they do not support a type
569  }
570  return Enumerable.Empty<BaseData>();
571  }).GetEnumerator();
572  }
573 
574  return new FilterEnumerator<BaseData>(result,
575  // don't let future data past, nor fill forward, that will be handled after merging with the file based enumerator
576  data => data == null || data.EndTime < warmup.EndTimeLocal && !data.IsFillForward);
577  }
578 
579  /// <summary>
580  /// Will wrap the provided enumerator with a <see cref="FrontierAwareEnumerator"/>
581  /// using a <see cref="PredicateTimeProvider"/> that will advance time based on the provided
582  /// function
583  /// </summary>
584  /// <remarks>Won't advance time if now.Hour is bigger or equal than 23pm, less or equal than 5am or Saturday.
585  /// This is done to prevent universe selection occurring in those hours so that the subscription changes
586  /// are handled correctly.</remarks>
587  private IEnumerator<BaseData> GetConfiguredFrontierAwareEnumerator(
588  IEnumerator<BaseData> enumerator,
589  TimeZoneOffsetProvider tzOffsetProvider,
590  Func<DateTime, bool> customStepEvaluator)
591  {
592  var stepTimeProvider = new PredicateTimeProvider(_frontierTimeProvider, customStepEvaluator);
593 
594  return new FrontierAwareEnumerator(enumerator, stepTimeProvider, tzOffsetProvider);
595  }
596 
597  private IDataQueueUniverseProvider GetUniverseProvider(SecurityType securityType)
598  {
599  if (_dataQueueHandler is not IDataQueueUniverseProvider or DataQueueHandlerManager { HasUniverseProvider: false })
600  {
601  throw new NotSupportedException($"The DataQueueHandler does not support {securityType}.");
602  }
603  return (IDataQueueUniverseProvider)_dataQueueHandler;
604  }
605 
606  private void HandleUnsupportedConfigurationEvent(object _, SubscriptionDataConfig config)
607  {
608  if (_algorithm != null)
609  {
610  lock (_unsupportedConfigurations)
611  {
612  var key = $"{config.Symbol.ID.Market} {config.Symbol.ID.SecurityType} {config.Type.Name}";
613  if (_unsupportedConfigurations.Add(key))
614  {
615  Log.Trace($"LiveTradingDataFeed.HandleUnsupportedConfigurationEvent(): detected unsupported configuration: {config}");
616 
617  _algorithm.Debug($"Warning: {key} data not supported. Please consider reviewing the data providers selection.");
618  }
619  }
620  }
621  }
622 
623  /// <summary>
624  /// Overrides methods of the base data exchange implementation
625  /// </summary>
626  private class EnumeratorHandler : BaseDataExchange.EnumeratorHandler
627  {
628  public EnumeratorHandler(Symbol symbol, IEnumerator<BaseData> enumerator, EnqueueableEnumerator<BaseData> enqueueable)
629  : base(symbol, enumerator, handleData: enqueueable.Enqueue)
630  {
631  EnumeratorFinished += (_, _) => enqueueable.Stop();
632  }
633  }
634 
635  private class LastPointTracker
636  {
637  public BaseData LastDataPoint { get; set; }
638  }
639  }
640 }